mod arq;
mod buffer;
mod history;
mod time;
use std::{
ops::RangeInclusive,
time::{Duration, Instant},
};
use arq::AutomaticRepeatRequestAlgorithm;
use crate::{
connection::ConnectionSettings,
packet::*,
protocol::{
encryption::{Decryption, DecryptionError},
output::Output,
time::Timers,
},
statistics::SocketStatistics,
};
#[derive(Debug, Eq, PartialEq)]
pub enum DataPacketError {
BufferFull {
seq_number: SeqNumber,
buffer_size: usize,
},
PacketTooEarly {
seq_number: SeqNumber,
buffer_available: usize,
buffer_required: usize,
},
PacketTooLate {
seq_number: SeqNumber,
seq_number_0: SeqNumber,
},
DiscardedDuplicate {
seq_number: SeqNumber,
},
DecryptionError(DecryptionError),
}
#[derive(Debug, Eq, PartialEq)]
pub enum DataPacketAction {
Received {
lrsn: SeqNumber,
recovered: bool,
},
ReceivedWithLoss(CompressedLossList),
ReceivedWithLightAck {
light_ack: SeqNumber,
recovered: bool,
},
}
#[derive(Debug)]
pub struct Receiver {
pub arq: AutomaticRepeatRequestAlgorithm,
pub decryption: Decryption,
}
impl Receiver {
pub fn new(settings: ConnectionSettings) -> Self {
Self {
arq: AutomaticRepeatRequestAlgorithm::new(
settings.socket_start_time,
settings.recv_tsbpd_latency,
settings.too_late_packet_drop,
settings.init_seq_num,
settings.recv_buffer_size,
),
decryption: Decryption::new(settings.cipher),
}
}
pub fn is_flushed(&self) -> bool {
self.arq.is_flushed()
}
pub fn rx_acknowledged_time(&self) -> Duration {
self.arq.rx_acknowledged_time()
}
}
pub struct ReceiverContext<'a> {
timers: &'a mut Timers,
output: &'a mut Output,
stats: &'a mut SocketStatistics,
receiver: &'a mut Receiver,
}
impl<'a> ReceiverContext<'a> {
pub fn new(
timers: &'a mut Timers,
output: &'a mut Output,
stats: &'a mut SocketStatistics,
receiver: &'a mut Receiver,
) -> Self {
Self {
timers,
stats,
output,
receiver,
}
}
pub fn synchronize_clock(&mut self, now: Instant, ts: TimeStamp) {
if let Some(_adjustment) = self.receiver.arq.synchronize_clock(now, ts) {
self.stats.rx_clock_adjustments += 1;
}
}
pub fn handle_data_packet(&mut self, now: Instant, data: DataPacket) {
use Acknowledgement::*;
use ControlTypes::*;
let bytes = data.wire_size() as u64;
self.stats.rx_data += 1;
self.stats.rx_bytes += bytes;
let data = self
.receiver
.decryption
.decrypt(data)
.map_err(DataPacketError::DecryptionError)
.and_then(|(decrypted_bytes, data)| {
if decrypted_bytes > 0 {
self.stats.rx_decrypted_data += 1;
}
self.receiver.arq.handle_data_packet(now, data)
});
match data {
Ok(action) => {
if action.is_recovered() {
self.stats.rx_retransmit_data += 1;
} else {
self.stats.rx_unique_data += 1;
self.stats.rx_unique_bytes += bytes;
}
use DataPacketAction::*;
match action {
ReceivedWithLoss(loss_list) => {
self.output.send_control(now, Nak(loss_list));
}
ReceivedWithLightAck { light_ack, .. } => {
self.output.send_control(now, Ack(Lite(light_ack)));
}
_ => {}
}
}
Err(e) => {
use DataPacketError::*;
match e {
BufferFull { .. } | PacketTooEarly { .. } | PacketTooLate { .. } => {
self.stats.rx_dropped_data += 1;
self.stats.rx_dropped_bytes += bytes;
}
DecryptionError(_) => {
self.stats.rx_decrypt_errors += 1;
self.stats.rx_decrypt_error_bytes += bytes;
}
DiscardedDuplicate { .. } => {}
}
}
}
}
pub fn handle_ack2_packet(&mut self, now: Instant, seq_num: FullAckSeqNumber) {
self.stats.rx_ack2 += 1;
let rtt = self.receiver.arq.handle_ack2_packet(now, seq_num);
if let Some(rtt) = rtt {
self.timers.update_rtt(rtt);
self.stats.rx_ack2_errors += 1;
}
}
pub fn handle_drop_request(&mut self, now: Instant, drop: RangeInclusive<SeqNumber>) {
let range = *drop.start()..*drop.end() + 1;
let dropped = self.receiver.arq.handle_drop_request(now, range) as u64;
if dropped > 0 {
self.stats.rx_dropped_data += dropped;
}
}
pub fn handle_key_refresh_request(
&mut self,
now: Instant,
keying_material: KeyingMaterialMessage,
) {
match self
.receiver
.decryption
.refresh_key_material(keying_material)
{
Ok(Some(response)) => {
self.output.send_control(
now,
ControlTypes::Srt(SrtControlPacket::KeyRefreshResponse(response)),
)
}
Ok(None) => {
}
Err(_err) => {
}
}
}
pub fn on_full_ack_event(&mut self, now: Instant) {
if let Some(ack) = self.receiver.arq.on_full_ack_event(now) {
self.output.send_control(now, ControlTypes::Ack(ack));
}
}
pub fn on_nak_event(&mut self, now: Instant) {
if let Some(loss_list) = self.receiver.arq.on_nak_event(now) {
self.output.send_control(now, ControlTypes::Nak(loss_list));
}
}
pub fn on_close_timeout(&mut self, _now: Instant) {
self.receiver.arq.clear()
}
}
impl DataPacketAction {
pub fn is_recovered(&self) -> bool {
use DataPacketAction::*;
match self {
Received { recovered, .. } | ReceivedWithLightAck { recovered, .. } => *recovered,
ReceivedWithLoss(_) => false,
}
}
}