use std::collections::VecDeque;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use bytes::Bytes;
use log::debug;
use log::{trace, warn};
use super::TimeSpan;
use crate::loss_compression::decompress_loss_list;
use crate::packet::{AckControlInfo, ControlTypes, HandshakeControlInfo, SrtControlPacket};
use crate::protocol::handshake::Handshake;
use crate::protocol::sender::buffers::*;
use crate::protocol::Timer;
use crate::Packet::*;
use crate::{
CCData, CongestCtrl, ConnectionSettings, ControlPacket, DataPacket, Packet, SeqNumber,
SrtCongestCtrl,
};
mod buffers;
#[derive(Debug)]
pub enum SenderError {}
pub type SenderResult = Result<(), SenderError>;
#[derive(Debug, Clone, Copy)]
pub struct SenderMetrics {
pub rtt: TimeSpan,
pub rtt_var: TimeSpan,
pub pkt_arr_rate: u32,
pub est_link_cap: i32,
pub lost_packets: u32,
pub retrans_packets: u32,
pub recvd_packets: u32,
}
impl SenderMetrics {
pub fn new() -> Self {
Self {
rtt: TimeSpan::from_micros(10_000),
rtt_var: TimeSpan::from_micros(0),
pkt_arr_rate: 0,
est_link_cap: 0,
lost_packets: 0,
retrans_packets: 0,
recvd_packets: 0,
}
}
}
#[derive(Debug, Clone)]
pub enum SenderAlgorithmAction {
WaitUntilAck,
WaitForData,
WaitUntil(Instant),
Close,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SenderAlgorithmStep {
Step1,
Step6,
}
pub struct Sender {
settings: ConnectionSettings,
handshake: Handshake,
congestion_control: SrtCongestCtrl,
metrics: SenderMetrics,
send_buffer: SendBuffer,
output_buffer: VecDeque<Packet>,
transmit_buffer: TransmitBuffer,
loss_list: LossList,
lr_acked_packet: SeqNumber,
lr_acked_ack: i32,
step: SenderAlgorithmStep,
snd_timer: Timer,
close: bool,
}
impl Default for SenderMetrics {
fn default() -> Self {
Self::new()
}
}
impl Sender {
pub fn new(
settings: ConnectionSettings,
handshake: Handshake,
congestion_control: SrtCongestCtrl,
) -> Self {
Self {
settings,
handshake,
congestion_control,
metrics: SenderMetrics::new(),
send_buffer: SendBuffer::new(&settings),
loss_list: LossList::new(&settings),
lr_acked_packet: settings.init_seq_num,
lr_acked_ack: -1,
output_buffer: VecDeque::new(),
transmit_buffer: TransmitBuffer::new(&settings),
step: SenderAlgorithmStep::Step1,
snd_timer: Timer::new(Duration::from_millis(1), settings.socket_start_time),
close: false,
}
}
pub fn settings(&self) -> &ConnectionSettings {
&self.settings
}
pub fn handle_close(&mut self) {
if !self.close {
self.close = true;
}
}
pub fn handle_data(&mut self, data: (Instant, Bytes)) {
self.transmit_buffer.push_message(data);
}
fn handle_snd_timer(&mut self, now: Instant) {
self.snd_timer.reset(now);
self.step = SenderAlgorithmStep::Step1;
}
pub fn handle_packet(
&mut self,
(packet, from): (Packet, SocketAddr),
now: Instant,
) -> SenderResult {
if from != self.settings.remote {
return Ok(());
}
log::info!("Received packet {:?}", packet);
match packet {
Control(control) => self.handle_control_packet(control, now),
Data(data) => self.handle_data_packet(data, now),
}
}
pub fn is_flushed(&mut self) -> bool {
self.loss_list.is_empty()
&& self.transmit_buffer.is_empty()
&& self.lr_acked_packet == self.transmit_buffer.next_sequence_number
&& self.send_buffer.is_empty()
&& self.output_buffer.is_empty()
}
pub fn pop_output(&mut self) -> Option<(Packet, SocketAddr)> {
let to = self.settings.remote;
self.output_buffer
.pop_front()
.map(move |packet| (packet, to))
}
pub fn next_action(&mut self, now: Instant) -> SenderAlgorithmAction {
use SenderAlgorithmAction::*;
use SenderAlgorithmStep::*;
if self.close && self.is_flushed() {
debug!("{:?} sending shutdown", self.settings.local_sockid);
self.send_control(ControlTypes::Shutdown, now);
return Close;
}
if let Some(exp_time) = self.snd_timer.check_expired(now) {
self.handle_snd_timer(exp_time);
}
if self.step == Step6 {
return WaitUntil(self.snd_timer.next_instant());
}
if let Some(p) = self.loss_list.pop_front() {
debug!("Sending packet in loss list, seq={:?}", p.seq_number);
self.send_data(p);
return WaitForData;
}
else if self.transmit_buffer.is_empty() {
return WaitForData;
}
else if self.lr_acked_packet
< self.transmit_buffer.next_sequence_number - self.congestion_control.window_size()
{
trace!("Flow window exceeded lr_acked={:?}, next_seq={:?}, window_size={}, next_seq-window={:?}",
self.lr_acked_packet,
self.transmit_buffer.next_sequence_number,
self.congestion_control.window_size(),
self.transmit_buffer.next_sequence_number - self.congestion_control.window_size());
return WaitUntilAck;
} else if let Some(p) = self.pop_transmit_buffer() {
self.send_data(p);
}
if let Some(p) = self.pop_transmit_buffer_16n() {
self.send_data(p);
}
self.step = Step6;
let period = self.congestion_control.send_interval();
self.snd_timer.set_period(period);
WaitUntil(self.snd_timer.next_instant())
}
fn handle_data_packet(&mut self, _packet: DataPacket, _now: Instant) -> SenderResult {
Ok(())
}
fn handle_control_packet(&mut self, packet: ControlPacket, now: Instant) -> SenderResult {
match packet.control_type {
ControlTypes::Ack(info) => self.handle_ack_packet(now, &info),
ControlTypes::Ack2(_) => {
warn!("Sender received ACK2, unusual");
Ok(())
}
ControlTypes::DropRequest { .. } => unimplemented!(),
ControlTypes::Handshake(shake) => self.handle_handshake_packet(shake, now),
ControlTypes::Nak(nack) => self.handle_nack_packet(nack),
ControlTypes::Shutdown => self.handle_shutdown_packet(),
ControlTypes::Srt(srt_packet) => self.handle_srt_control_packet(srt_packet),
ControlTypes::KeepAlive => Ok(()),
}
}
#[allow(clippy::too_many_arguments)]
fn handle_ack_packet(&mut self, now: Instant, info: &AckControlInfo) -> SenderResult {
if info.ack_number <= self.lr_acked_packet {
return Ok(());
}
if info.ack_seq_num <= self.lr_acked_ack {
return Ok(());
}
self.lr_acked_ack = info.ack_seq_num;
self.metrics.recvd_packets += info.ack_number - self.lr_acked_packet;
self.lr_acked_packet = info.ack_number;
self.send_control(ControlTypes::Ack2(info.ack_seq_num), now);
self.metrics.rtt = info.rtt.unwrap_or_else(|| TimeSpan::from_micros(0));
self.metrics.rtt_var = info
.rtt_variance
.unwrap_or_else(|| TimeSpan::from_micros(0));
{
let cc_info = self.make_cc_info();
self.congestion_control.on_ack(&cc_info);
}
self.metrics.pkt_arr_rate =
self.metrics.pkt_arr_rate / 8 * 7 + info.packet_recv_rate.unwrap_or(0) / 8;
self.metrics.est_link_cap =
(self.metrics.est_link_cap * 7 + info.est_link_cap.unwrap_or(0)) / 8;
self.send_buffer
.release_acknowledged_packets(info.ack_number);
self.metrics.retrans_packets += self.loss_list.remove_acknowledged_packets(info.ack_number);
Ok(())
}
fn handle_shutdown_packet(&mut self) -> SenderResult {
self.close = true;
Ok(())
}
fn handle_nack_packet(&mut self, nack: Vec<u32>) -> SenderResult {
for lost in self
.send_buffer
.get(decompress_loss_list(nack.iter().cloned()))
{
let packet = match lost {
Ok(p) => p,
Err(n) => {
debug!("NAK received for packet {} that's not in the buffer, maybe it's already been ACKed", n);
return Ok(());
}
};
self.loss_list.push_back(packet.clone());
}
if let Some(last_packet) = self.loss_list.back() {
let cc_info = self.make_cc_info();
self.congestion_control
.on_nak(last_packet.seq_number, &cc_info);
}
Ok(())
}
fn handle_handshake_packet(
&mut self,
handshake: HandshakeControlInfo,
now: Instant,
) -> SenderResult {
if let Some(control_type) = self.handshake.handle_handshake(handshake) {
self.send_control(control_type, now);
}
Ok(())
}
fn handle_srt_control_packet(&mut self, packet: SrtControlPacket) -> SenderResult {
use self::SrtControlPacket::*;
match packet {
HandshakeRequest(_) | HandshakeResponse(_) => {
warn!("Received handshake request or response for an already setup SRT connection")
}
_ => unimplemented!(),
}
Ok(())
}
fn make_cc_info(&self) -> CCData {
CCData {
est_bandwidth: self.metrics.est_link_cap,
max_segment_size: self.settings.max_packet_size,
latest_seq_num: Some(self.transmit_buffer.latest_seqence_number()),
packet_arr_rate: self.metrics.pkt_arr_rate,
rtt: Duration::from_micros(self.metrics.rtt.as_micros() as u64),
}
}
fn pop_transmit_buffer(&mut self) -> Option<DataPacket> {
let packet = self.transmit_buffer.pop_front()?;
self.congestion_control.on_packet_sent(&self.make_cc_info());
self.send_buffer.push_back(packet.clone());
Some(packet)
}
fn pop_transmit_buffer_16n(&mut self) -> Option<DataPacket> {
match self.transmit_buffer.front().map(|p| p.seq_number % 16) {
Some(0) => self.pop_transmit_buffer(),
_ => None,
}
}
fn send_control(&mut self, control: ControlTypes, now: Instant) {
self.output_buffer.push_back(Packet::Control(ControlPacket {
timestamp: self.transmit_buffer.timestamp_from(now),
dest_sockid: self.settings.remote_sockid,
control_type: control,
}));
}
fn send_data(&mut self, p: DataPacket) {
self.output_buffer.push_back(Packet::Data(p));
}
}