mod buffers;
mod congestion_control;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use bytes::Bytes;
use log::debug;
use log::{trace, warn};
use super::TimeSpan;
use crate::loss_compression::decompress_loss_list;
use crate::packet::{AckControlInfo, ControlTypes, HandshakeControlInfo, SrtControlPacket};
use crate::protocol::handshake::Handshake;
use crate::protocol::Timer;
use crate::{ConnectionSettings, ControlPacket, DataPacket, Packet, SeqNumber};
use buffers::*;
use congestion_control::{LiveDataRate, SenderCongestionControl};
#[derive(Debug)]
pub enum SenderError {}
pub type SenderResult = Result<(), SenderError>;
#[derive(Debug, Clone, Copy)]
pub struct SenderMetrics {
pub rtt: TimeSpan,
pub rtt_var: TimeSpan,
pub pkt_arr_rate: u32,
pub est_link_cap: i32,
pub lost_packets: u32,
pub retrans_packets: u32,
pub recvd_packets: u32,
}
impl SenderMetrics {
pub fn new() -> Self {
Self {
rtt: TimeSpan::from_micros(10_000),
rtt_var: TimeSpan::from_micros(0),
pkt_arr_rate: 0,
est_link_cap: 0,
lost_packets: 0,
retrans_packets: 0,
recvd_packets: 0,
}
}
}
#[derive(Debug, Clone)]
pub enum SenderAlgorithmAction {
WaitUntilAck,
WaitForData,
WaitUntil(Instant),
Close,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SenderAlgorithmStep {
Step1,
Step6,
}
pub struct Sender {
settings: ConnectionSettings,
handshake: Handshake,
congestion_control: SenderCongestionControl,
metrics: SenderMetrics,
send_buffer: SendBuffer,
output_buffer: VecDeque<Packet>,
transmit_buffer: TransmitBuffer,
loss_list: LossList,
lr_acked_packet: SeqNumber,
lr_acked_ack: i32,
step: SenderAlgorithmStep,
snd_timer: Timer,
close_requested: bool,
shutdown_sent: bool,
}
impl Default for SenderMetrics {
fn default() -> Self {
Self::new()
}
}
impl Sender {
pub fn new(settings: ConnectionSettings, handshake: Handshake) -> Self {
Self {
settings: settings.clone(),
handshake,
congestion_control: SenderCongestionControl::new(LiveDataRate::Unlimited, None),
metrics: SenderMetrics::new(),
send_buffer: SendBuffer::new(&settings),
loss_list: LossList::new(&settings),
lr_acked_packet: settings.init_send_seq_num,
lr_acked_ack: -1,
output_buffer: VecDeque::new(),
transmit_buffer: TransmitBuffer::new(&settings),
step: SenderAlgorithmStep::Step1,
snd_timer: Timer::new(Duration::from_millis(1), settings.socket_start_time),
close_requested: false,
shutdown_sent: false,
}
}
pub fn settings(&self) -> &ConnectionSettings {
&self.settings
}
pub fn handle_close(&mut self) {
self.close_requested = true;
}
pub fn handle_data(&mut self, data: (Instant, Bytes), now: Instant) {
let data_length = data.1.len();
let packet_count = self.transmit_buffer.push_message(data);
self.congestion_control
.on_input(now, packet_count, data_length);
}
fn handle_snd_timer(&mut self, now: Instant) {
self.snd_timer.reset(now);
self.step = SenderAlgorithmStep::Step1;
}
pub fn handle_packet(
&mut self,
(packet, from): (Packet, SocketAddr),
now: Instant,
) -> SenderResult {
if from != self.settings.remote {
return Ok(());
}
debug!("Received packet {:?}", packet);
match packet {
Packet::Control(control) => self.handle_control_packet(control, now),
Packet::Data(data) => self.handle_data_packet(data, now),
}
}
pub fn is_flushed(&self) -> bool {
trace!("{:?} Checking is flushed: ll.len()={}, tb.len()={}, lrap={}, nsn={}, sb.len()={}, ob.len()={}", self.settings.local_sockid, self.loss_list.len(),
self.transmit_buffer.len(), self.lr_acked_packet, self.transmit_buffer.next_sequence_number, self.send_buffer.len(), self.output_buffer.len());
self.loss_list.is_empty()
&& self.transmit_buffer.is_empty()
&& self.lr_acked_packet == self.transmit_buffer.next_sequence_number
&& self.output_buffer.is_empty()
}
pub fn pop_output(&mut self) -> Option<(Packet, SocketAddr)> {
let to = self.settings.remote;
self.output_buffer
.pop_front()
.map(move |packet| (packet, to))
}
pub fn next_action(&mut self, now: Instant) -> SenderAlgorithmAction {
use SenderAlgorithmAction::*;
use SenderAlgorithmStep::*;
if self.close_requested && self.is_flushed() {
if !self.shutdown_sent {
debug!("{:?} sending shutdown", self.settings.local_sockid);
self.send_control(ControlTypes::Shutdown, now);
self.shutdown_sent = true;
}
return Close;
}
if let Some(exp_time) = self.snd_timer.check_expired(now) {
self.handle_snd_timer(exp_time);
}
if self.step == Step6 {
return WaitUntil(self.snd_timer.next_instant());
}
if let Some(p) = self.loss_list.pop_front() {
debug!("Sending packet in loss list, seq={:?}", p.seq_number);
self.send_data(p);
return WaitForData;
}
else if self.transmit_buffer.is_empty() && !self.close_requested {
return WaitForData;
}
else if self.lr_acked_packet
< self.transmit_buffer.next_sequence_number - self.congestion_control.window_size()
{
trace!("Flow window exceeded lr_acked={:?}, next_seq={:?}, window_size={}, next_seq-window={:?}",
self.lr_acked_packet,
self.transmit_buffer.next_sequence_number,
self.congestion_control.window_size(),
self.transmit_buffer.next_sequence_number - self.congestion_control.window_size());
return WaitUntilAck;
} else if let Some(p) = self.pop_transmit_buffer() {
self.send_data(p);
} else if self.close_requested {
if let Some(dp) = self.send_buffer.front().cloned() {
self.send_data(dp);
}
}
if let Some(p) = self.pop_transmit_buffer_16n() {
self.send_data(p);
}
self.step = Step6;
self.snd_timer
.set_period(self.congestion_control.snd_period());
WaitUntil(self.snd_timer.next_instant())
}
fn handle_data_packet(&mut self, _packet: DataPacket, _now: Instant) -> SenderResult {
Ok(())
}
fn handle_control_packet(&mut self, packet: ControlPacket, now: Instant) -> SenderResult {
match packet.control_type {
ControlTypes::Ack(info) => self.handle_ack_packet(now, &info),
ControlTypes::Ack2(_) => {
warn!("Sender received ACK2, unusual");
Ok(())
}
ControlTypes::DropRequest { .. } => unimplemented!(),
ControlTypes::Handshake(shake) => self.handle_handshake_packet(shake, now),
ControlTypes::Nak(nack) => self.handle_nack_packet(nack),
ControlTypes::Shutdown => self.handle_shutdown_packet(),
ControlTypes::Srt(srt_packet) => self.handle_srt_control_packet(srt_packet),
ControlTypes::KeepAlive => Ok(()),
}
}
fn handle_ack_packet(&mut self, now: Instant, info: &AckControlInfo) -> SenderResult {
if info.ack_number < self.lr_acked_packet {
return Ok(());
}
if info.ack_seq_num <= self.lr_acked_ack {
return Ok(());
}
self.lr_acked_ack = info.ack_seq_num;
self.metrics.recvd_packets += info.ack_number - self.lr_acked_packet;
self.lr_acked_packet = info.ack_number;
self.send_control(ControlTypes::Ack2(info.ack_seq_num), now);
self.metrics.rtt = info.rtt.unwrap_or_else(|| TimeSpan::from_micros(0));
self.metrics.rtt_var = info
.rtt_variance
.unwrap_or_else(|| TimeSpan::from_micros(0));
self.congestion_control.on_ack();
self.metrics.pkt_arr_rate =
self.metrics.pkt_arr_rate / 8 * 7 + info.packet_recv_rate.unwrap_or(0) / 8;
self.metrics.est_link_cap =
(self.metrics.est_link_cap * 7 + info.est_link_cap.unwrap_or(0)) / 8;
self.send_buffer
.release_acknowledged_packets(info.ack_number);
self.metrics.retrans_packets += self.loss_list.remove_acknowledged_packets(info.ack_number);
Ok(())
}
fn handle_shutdown_packet(&mut self) -> SenderResult {
self.close_requested = true;
Ok(())
}
fn handle_nack_packet(&mut self, nack: Vec<u32>) -> SenderResult {
for lost in self
.send_buffer
.get(decompress_loss_list(nack.iter().cloned()))
{
let packet = match lost {
Ok(p) => p,
Err(n) => {
debug!("NAK received for packet {} that's not in the buffer, maybe it's already been ACKed", n);
return Ok(());
}
};
if packet.seq_number < self.lr_acked_packet {
continue;
}
self.loss_list.push_back(packet.clone());
}
if let Some(last_packet) = self.loss_list.back() {
self.congestion_control.on_nak(last_packet.seq_number);
}
Ok(())
}
fn handle_handshake_packet(
&mut self,
handshake: HandshakeControlInfo,
now: Instant,
) -> SenderResult {
if let Some(control_type) = self.handshake.handle_handshake(handshake) {
self.send_control(control_type, now);
}
Ok(())
}
fn handle_srt_control_packet(&mut self, packet: SrtControlPacket) -> SenderResult {
use self::SrtControlPacket::*;
match packet {
HandshakeRequest(_) | HandshakeResponse(_) => {
warn!("Received handshake request or response for an already setup SRT connection")
}
_ => unimplemented!(),
}
Ok(())
}
fn pop_transmit_buffer(&mut self) -> Option<DataPacket> {
let packet = self.transmit_buffer.pop_front()?;
self.congestion_control.on_packet_sent();
self.send_buffer.push_back(packet.clone());
Some(packet)
}
fn pop_transmit_buffer_16n(&mut self) -> Option<DataPacket> {
match self.transmit_buffer.front().map(|p| p.seq_number % 16) {
Some(0) => self.pop_transmit_buffer(),
_ => None,
}
}
fn send_control(&mut self, control: ControlTypes, now: Instant) {
self.output_buffer.push_back(Packet::Control(ControlPacket {
timestamp: self.transmit_buffer.timestamp_from(now),
dest_sockid: self.settings.remote_sockid,
control_type: control,
}));
}
fn send_data(&mut self, p: DataPacket) {
self.output_buffer.push_back(Packet::Data(p));
}
}