mod buffer;
mod congestion_control;
mod encapsulate;
use std::{
convert::TryFrom,
time::{Duration, Instant},
};
use bytes::Bytes;
use crate::{
connection::{ConnectionSettings, ConnectionStatus},
options::*,
packet::*,
protocol::{
encryption::Encryption,
output::Output,
time::{TimeBase, Timers},
},
statistics::SocketStatistics,
};
use buffer::{AckAction, Loss, SendBuffer, SenderAction};
use congestion_control::SenderCongestionControl;
use encapsulate::Encapsulation;
#[derive(Debug)]
pub struct Sender {
time_base: TimeBase,
encapsulation: Encapsulation,
encryption: Encryption,
send_buffer: SendBuffer,
congestion_control: SenderCongestionControl,
}
impl Sender {
pub fn new(settings: ConnectionSettings) -> Self {
Self {
time_base: TimeBase::new(settings.socket_start_time),
encapsulation: Encapsulation::new(&settings),
encryption: Encryption::new(settings.cipher.clone()),
send_buffer: SendBuffer::new(&settings),
congestion_control: SenderCongestionControl::new(settings.bandwidth.clone()),
}
}
pub fn is_flushed(&self) -> bool {
self.send_buffer.is_flushed()
}
pub fn has_packets_to_send(&self) -> bool {
self.send_buffer.has_packets_to_send()
}
pub fn tx_buffered_time(&self) -> Duration {
self.send_buffer.duration()
}
pub fn tx_buffered_packets(&self) -> u64 {
u64::try_from(self.send_buffer.len()).unwrap()
}
pub fn tx_buffered_bytes(&self) -> u64 {
u64::try_from(self.send_buffer.len_bytes()).unwrap()
}
}
pub struct SenderContext<'a> {
status: &'a mut ConnectionStatus,
timers: &'a mut Timers,
output: &'a mut Output,
stats: &'a mut SocketStatistics,
sender: &'a mut Sender,
}
impl<'a> SenderContext<'a> {
pub fn new(
status: &'a mut ConnectionStatus,
timers: &'a mut Timers,
output: &'a mut Output,
stats: &'a mut SocketStatistics,
sender: &'a mut Sender,
) -> Self {
Self {
status,
timers,
output,
stats,
sender,
}
}
pub fn handle_data(&mut self, now: Instant, item: (Instant, Bytes)) {
let (time, data) = item;
let (mut packets, mut bytes) = (0, 0);
let ts = self.sender.time_base.timestamp_from(time);
for packet in self.sender.encapsulation.encapsulate(ts, data) {
if let Some((bytes_enc, packet, km)) = self.sender.encryption.encrypt(packet) {
packets += 1;
bytes += packet.payload.len() as u64;
if bytes_enc > 0 {
self.stats.tx_encrypted_data += 1;
}
if let Err((p_count, b_count)) = self.sender.send_buffer.push_data(packet) {
self.stats.tx_dropped_data += p_count.0;
self.stats.tx_dropped_bytes += b_count.0;
}
let control = km.map(ControlTypes::new_key_refresh_request);
if let Some(control) = control {
self.output.send_control(now, control);
}
}
}
let snd_period =
self.sender
.congestion_control
.on_input(now, PacketCount(packets), ByteCount(bytes));
if let Some(snd_period) = snd_period {
self.timers.update_snd_period(snd_period)
}
}
pub fn handle_ack_packet(&mut self, now: Instant, ack: Acknowledgement) {
self.stats.rx_ack += 1;
if matches!(ack, Acknowledgement::Lite(_)) {
self.stats.rx_light_ack += 1;
}
match self.sender.send_buffer.update_largest_acked_seq_number(
ack.ack_number(),
ack.full_ack_seq_number(),
ack.rtt(),
) {
Ok(AckAction {
received: _,
recovered: _,
send_ack2,
}) => {
if let Some(full_ack) = send_ack2 {
self.output.send_control(now, ControlTypes::Ack2(full_ack))
}
}
Err(_error) => {
}
}
}
pub fn handle_nak_packet(&mut self, now: Instant, nak: CompressedLossList) {
self.stats.rx_nak += 1;
for (loss, range) in self.sender.send_buffer.add_to_loss_list(nak) {
use Loss::*;
match loss {
Ignored | Added => {
self.stats.tx_loss_data += 1;
}
Dropped => {
self.stats.tx_dropped_data += 1;
self.output.send_control(
now,
ControlTypes::new_drop_request(MsgNumber::new_truncate(0), range),
)
}
}
}
}
pub fn handle_key_refresh_response(&mut self, keying_material: KeyingMaterialMessage) {
match self
.sender
.encryption
.handle_key_refresh_response(keying_material)
{
Ok(()) => {
}
Err(_err) => {
}
}
}
pub fn on_snd_event(&mut self, now: Instant, elapsed_periods: u32) {
use SenderAction::*;
let ts_now = self.sender.time_base.timestamp_from(now);
let actions = self.sender.send_buffer.next_snd_actions(
ts_now,
elapsed_periods,
self.status.should_drain_send_buffer(),
);
for action in actions {
match action {
Send(d) => {
self.stats.tx_unique_data += 1;
self.output.send_data(now, d);
}
RetransmitNak(d) => {
self.stats.tx_retransmit_data += 1;
self.output.send_data(now, d);
}
RetransmitRto(d) => {
self.stats.tx_retransmit_data += 1;
self.output.send_data(now, d);
}
Drop(_) => {}
WaitForInput => {
break;
}
WaitForAck { .. } => {
break;
}
}
}
}
}