srt_protocol/protocol/receiver/
mod.rs1mod arq;
2mod buffer;
3mod history;
4mod time;
5
6use std::{
7 ops::RangeInclusive,
8 time::{Duration, Instant},
9};
10
11use arq::AutomaticRepeatRequestAlgorithm;
12
13use crate::{
14 connection::ConnectionSettings,
15 packet::*,
16 protocol::{
17 encryption::{Decryption, DecryptionError},
18 output::Output,
19 time::Timers,
20 },
21 statistics::SocketStatistics,
22};
23
24#[derive(Debug, Eq, PartialEq)]
25pub enum DataPacketError {
26 BufferFull {
28 seq_number: SeqNumber,
29 buffer_size: usize,
30 },
31 PacketTooEarly {
33 seq_number: SeqNumber,
34 buffer_available: usize,
35 buffer_required: usize,
36 },
37 PacketTooLate {
39 seq_number: SeqNumber,
40 seq_number_0: SeqNumber,
41 },
42 DiscardedDuplicate {
44 seq_number: SeqNumber,
45 },
46 DecryptionError(DecryptionError),
47}
48
49#[derive(Debug, Eq, PartialEq)]
50pub enum DataPacketAction {
51 Received {
52 lrsn: SeqNumber,
53 recovered: bool,
54 },
55 ReceivedWithLoss(CompressedLossList),
56 ReceivedWithLightAck {
57 light_ack: SeqNumber,
58 recovered: bool,
59 },
60}
61
62#[derive(Debug)]
63pub struct Receiver {
64 pub arq: AutomaticRepeatRequestAlgorithm,
65 pub decryption: Decryption,
66}
67
68impl Receiver {
69 pub fn new(settings: ConnectionSettings) -> Self {
70 Self {
71 arq: AutomaticRepeatRequestAlgorithm::new(
72 settings.socket_start_time,
73 settings.recv_tsbpd_latency,
74 settings.too_late_packet_drop,
75 settings.init_seq_num,
76 settings.recv_buffer_size,
77 ),
78 decryption: Decryption::new(settings.cipher),
79 }
80 }
81
82 pub fn is_flushed(&self) -> bool {
83 self.arq.is_flushed()
84 }
85
86 pub fn rx_acknowledged_time(&self) -> Duration {
87 self.arq.rx_acknowledged_time()
88 }
89}
90
91pub struct ReceiverContext<'a> {
92 timers: &'a mut Timers,
93 output: &'a mut Output,
94 stats: &'a mut SocketStatistics,
95 receiver: &'a mut Receiver,
96}
97
98impl<'a> ReceiverContext<'a> {
99 pub fn new(
100 timers: &'a mut Timers,
101 output: &'a mut Output,
102 stats: &'a mut SocketStatistics,
103 receiver: &'a mut Receiver,
104 ) -> Self {
105 Self {
106 timers,
107 stats,
108 output,
109 receiver,
110 }
111 }
112
113 pub fn synchronize_clock(&mut self, now: Instant, ts: TimeStamp) {
114 if let Some(_adjustment) = self.receiver.arq.synchronize_clock(now, ts) {
115 self.stats.rx_clock_adjustments += 1;
117 }
118 }
119
120 pub fn handle_data_packet(&mut self, now: Instant, data: DataPacket) {
121 use Acknowledgement::*;
122 use ControlTypes::*;
123 let bytes = data.wire_size() as u64;
124 self.stats.rx_data += 1;
125 self.stats.rx_bytes += bytes;
126
127 let data = self
128 .receiver
129 .decryption
130 .decrypt(data)
131 .map_err(DataPacketError::DecryptionError)
132 .and_then(|(decrypted_bytes, data)| {
133 if decrypted_bytes > 0 {
134 self.stats.rx_decrypted_data += 1;
135 }
136 self.receiver.arq.handle_data_packet(now, data)
137 });
138
139 match data {
140 Ok(action) => {
141 if action.is_recovered() {
142 self.stats.rx_retransmit_data += 1;
143 } else {
144 self.stats.rx_unique_data += 1;
145 self.stats.rx_unique_bytes += bytes;
146 }
147
148 use DataPacketAction::*;
149 match action {
150 ReceivedWithLoss(loss_list) => {
151 self.output.send_control(now, Nak(loss_list));
152 }
153 ReceivedWithLightAck { light_ack, .. } => {
154 self.output.send_control(now, Ack(Lite(light_ack)));
155 }
156 _ => {}
157 }
158 }
159 Err(e) => {
160 use DataPacketError::*;
161 match e {
162 BufferFull { .. } | PacketTooEarly { .. } | PacketTooLate { .. } => {
163 self.stats.rx_dropped_data += 1;
164 self.stats.rx_dropped_bytes += bytes;
165 }
166 DecryptionError(_) => {
167 self.stats.rx_decrypt_errors += 1;
168 self.stats.rx_decrypt_error_bytes += bytes;
169 }
170 DiscardedDuplicate { .. } => {}
171 }
172 }
173 }
174 }
175
176 pub fn handle_ack2_packet(&mut self, now: Instant, seq_num: FullAckSeqNumber) {
177 self.stats.rx_ack2 += 1;
178 let rtt = self.receiver.arq.handle_ack2_packet(now, seq_num);
179 if let Some(rtt) = rtt {
180 self.timers.update_rtt(rtt);
181 self.stats.rx_ack2_errors += 1;
183 }
184 }
185
186 pub fn handle_drop_request(&mut self, now: Instant, drop: RangeInclusive<SeqNumber>) {
187 let range = *drop.start()..*drop.end() + 1;
188 let dropped = self.receiver.arq.handle_drop_request(now, range) as u64;
189 if dropped > 0 {
190 self.stats.rx_dropped_data += dropped;
192 }
193 }
194
195 pub fn handle_key_refresh_request(
196 &mut self,
197 now: Instant,
198 keying_material: KeyingMaterialMessage,
199 ) {
200 match self
201 .receiver
202 .decryption
203 .refresh_key_material(keying_material)
204 {
205 Ok(Some(response)) => {
206 self.output.send_control(
209 now,
210 ControlTypes::Srt(SrtControlPacket::KeyRefreshResponse(response)),
211 )
212 }
213 Ok(None) => {
214 }
216 Err(_err) => {
217 }
219 }
220 }
221
222 pub fn on_full_ack_event(&mut self, now: Instant) {
223 if let Some(ack) = self.receiver.arq.on_full_ack_event(now) {
224 self.output.send_control(now, ControlTypes::Ack(ack));
227 }
228 }
229
230 pub fn on_nak_event(&mut self, now: Instant) {
231 if let Some(loss_list) = self.receiver.arq.on_nak_event(now) {
232 self.output.send_control(now, ControlTypes::Nak(loss_list));
233 }
234 }
235
236 pub fn on_close_timeout(&mut self, _now: Instant) {
237 self.receiver.arq.clear()
239 }
240}
241
242impl DataPacketAction {
243 pub fn is_recovered(&self) -> bool {
244 use DataPacketAction::*;
245 match self {
246 Received { recovered, .. } | ReceivedWithLightAck { recovered, .. } => *recovered,
247 ReceivedWithLoss(_) => false,
248 }
249 }
250}