1use std::cmp::Ordering;
2use std::cmp::{max, min};
3use std::collections::VecDeque;
4use std::iter::Iterator;
5use std::net::SocketAddr;
6use std::time::{Duration, Instant};
7
8use bytes::Bytes;
9use log::{debug, info, trace, warn};
10
11use super::{TimeSpan, Timer};
12use crate::loss_compression::compress_loss_list;
13use crate::packet::{
14 AckControlInfo, ControlPacket, ControlTypes, DataPacket, HandshakeControlInfo, Packet,
15 SrtControlPacket,
16};
17use crate::protocol::handshake::Handshake;
18use crate::protocol::TimeStamp;
19use crate::{seq_number::seq_num_range, ConnectionSettings, SeqNumber};
20
21mod buffer;
22mod time;
23
24use buffer::RecvBuffer;
25
26#[derive(Debug, Clone)]
27#[allow(clippy::large_enum_variant)]
28pub enum ReceiverAlgorithmAction {
29 TimeBoundedReceive(Instant),
30 SendControl(ControlPacket, SocketAddr),
31 OutputData((Instant, Bytes)),
32 Close,
33}
34
35struct ReceiveTimers {
36 syn: Timer,
37 ack: Timer,
38 nak: Timer,
39}
40
41impl ReceiveTimers {
42 pub fn new(now: Instant) -> ReceiveTimers {
43 let syn = Duration::from_millis(10);
44 ReceiveTimers {
45 syn: Timer::new(syn, now),
46 ack: Timer::new(syn, now),
47 nak: Timer::new(syn, now),
48 }
49 }
50
51 pub fn update_rtt(&mut self, rtt: TimeSpan, rtt_var: TimeSpan) {
52 let rtt = Duration::from_micros(rtt.as_micros() as u64);
53 let rtt_var = Duration::from_micros(rtt_var.as_micros() as u64);
54 self.nak.set_period(4 * rtt + rtt_var + self.syn.period());
55 self.ack.set_period(4 * rtt + rtt_var + self.syn.period());
56 }
57
58 pub fn next_timer(&self) -> Instant {
59 min(self.ack.next_instant(), self.nak.next_instant())
60 }
61}
62
63struct LossListEntry {
64 seq_num: SeqNumber,
65
66 feedback_time: TimeStamp,
68
69 k: i32,
71}
72
73struct AckHistoryEntry {
74 ack_number: SeqNumber,
76
77 ack_seq_num: i32,
79
80 timestamp: TimeStamp,
82}
83
84pub struct Receiver {
85 settings: ConnectionSettings,
86
87 handshake: Handshake,
88
89 timers: ReceiveTimers,
90
91 control_packets: VecDeque<Packet>,
92
93 data_release: VecDeque<(Instant, Bytes)>,
94
95 rtt: TimeSpan,
98
99 rtt_variance: TimeSpan,
102
103 loss_list: Vec<LossListEntry>,
110
111 ack_history_window: Vec<AckHistoryEntry>,
116
117 packet_history_window: Vec<(SeqNumber, TimeStamp)>,
123
124 packet_pair_window: Vec<(SeqNumber, TimeSpan)>,
130
131 lrsn: SeqNumber,
133
134 next_ack: i32,
136
137 probe_time: Option<TimeStamp>,
140
141 lr_ack_acked: (i32, SeqNumber),
143
144 receive_buffer: RecvBuffer,
146
147 shutdown_flag: bool,
149}
150
151impl Receiver {
152 pub fn new(settings: ConnectionSettings, handshake: Handshake) -> Self {
153 let init_seq_num = settings.init_seq_num;
154
155 info!(
156 "Receiving started from {:?}, with latency={:?}",
157 settings.remote, settings.tsbpd_latency
158 );
159
160 Receiver {
161 settings,
162 timers: ReceiveTimers::new(settings.socket_start_time),
163 control_packets: VecDeque::new(),
164 data_release: VecDeque::new(),
165 handshake,
166 rtt: TimeSpan::from_micros(10_000),
167 rtt_variance: TimeSpan::from_micros(1_000),
168 loss_list: Vec::new(),
169 ack_history_window: Vec::new(),
170 packet_history_window: Vec::new(),
171 packet_pair_window: Vec::new(),
172 lrsn: init_seq_num, next_ack: 1,
174 probe_time: None,
175 lr_ack_acked: (0, init_seq_num),
176 receive_buffer: RecvBuffer::with(&settings),
177 shutdown_flag: false,
178 }
179 }
180
181 pub fn handle_shutdown(&mut self) {
182 self.shutdown_flag = true;
183 }
184
185 pub fn handle_packet(&mut self, now: Instant, (packet, from): (Packet, SocketAddr)) {
187 if from != self.settings.remote {
189 info!("Packet received from unknown address: {:?}", from);
190 return;
191 }
192
193 if self.settings.local_sockid != packet.dest_sockid() {
194 info!(
196 "Packet send to socket id ({}) that does not match local ({})",
197 packet.dest_sockid().0,
198 self.settings.local_sockid.0
199 );
200 return;
201 }
202
203 trace!("Received packet: {:?}", packet);
204
205 match packet {
206 Packet::Control(ctrl) => {
207 self.receive_buffer.synchronize_clock(now, ctrl.timestamp);
208
209 match ctrl.control_type {
211 ControlTypes::Ack { .. } => warn!("Receiver received ACK packet, unusual"),
212 ControlTypes::Ack2(seq_num) => self.handle_ack2(seq_num, now),
213 ControlTypes::DropRequest { .. } => unimplemented!(),
214 ControlTypes::Handshake(shake) => self.handle_handshake_packet(now, shake),
215 ControlTypes::KeepAlive => {} ControlTypes::Nak { .. } => warn!("Receiver received NAK packet, unusual"),
217 ControlTypes::Shutdown => {
218 info!("Shutdown packet received, flushing receiver...");
219 self.shutdown_flag = true;
220 } ControlTypes::Srt(srt_packet) => {
222 self.handle_srt_control_packet(srt_packet);
223 }
224 }
225 }
226 Packet::Data(data) => self.handle_data_packet(&data, now),
227 };
228 }
229
230 pub fn next_algorithm_action(&mut self, now: Instant) -> ReceiverAlgorithmAction {
232 use ReceiverAlgorithmAction::*;
233
234 if self.timers.ack.check_expired(now).is_some() {
240 self.on_ack_event(now);
241 }
242 if self.timers.nak.check_expired(now).is_some() {
243 self.on_nak_event(now);
244 }
245
246 if let Some(data) = self.pop_data(now) {
247 OutputData(data)
248 } else if let Some(Packet::Control(packet)) = self.pop_conotrol_packet() {
249 SendControl(packet, self.settings.remote)
250 } else if self.shutdown_flag && self.is_flushed() {
251 Close
252 } else {
253 TimeBoundedReceive(self.next_timer(now))
255 }
256 }
257
258 pub fn is_flushed(&self) -> bool {
259 self.receive_buffer.next_msg_ready().is_none()
260 }
261
262 fn on_ack_event(&mut self, now: Instant) {
263 trace!("Ack event hit {:?}", self.settings.local_sockid);
264 let ack_number = match self.loss_list.first() {
266 Some(i) => i.seq_num,
268 None => self.lrsn,
270 };
271
272 if ack_number == self.lr_ack_acked.1 {
275 return;
277 }
278
279 if let Some(w) = self.ack_history_window.last() {
281 assert!(w.ack_number <= ack_number);
282 }
283
284 trace!(
285 "Sending ACK; ack_num={:?}, lr_ack_acked={:?}",
286 ack_number,
287 self.lr_ack_acked.1
288 );
289
290 if let Some(&AckHistoryEntry {
291 ack_number: last_ack_number,
292 timestamp: last_timestamp,
293 ..
294 }) = self.ack_history_window.first()
295 {
296 if last_ack_number == ack_number &&
299 (self.receive_buffer.timestamp_from(now) - last_timestamp) < (self.rtt * 2)
302 {
303 return;
305 }
306 }
307
308 let ack_seq_num = self.next_ack;
310 self.next_ack += 1;
311
312 let packet_recv_rate = if self.packet_history_window.len() < 16 {
315 0
316 } else {
317 let mut last_16: Vec<_> = self.packet_history_window
320 [self.packet_history_window.len() - 16..]
321 .windows(2)
322 .map(|w| w[1].1 - w[0].1) .collect();
324 last_16.sort();
325
326 let ai = last_16[last_16.len() / 2];
328
329 let filtered: Vec<TimeSpan> = last_16
332 .iter()
333 .filter(|&&n| n / 8 < ai && n > ai / 8)
334 .cloned()
335 .collect();
336
337 if filtered.len() > 8 {
341 (1_000_000 * filtered.len()) as u64
343 / filtered
344 .iter()
345 .map(|dt| i64::from(dt.as_micros()))
346 .sum::<i64>() as u64 } else {
348 0
349 }
350 } as u32;
351
352 let est_link_cap = {
354 if self.packet_pair_window.len() < 16 {
355 0
356 } else {
357 let pi = {
361 let mut last_16: Vec<_> = self.packet_pair_window
362 [self.packet_pair_window.len() - 16..]
363 .iter()
364 .map(|&(_, time)| time)
365 .collect();
366 last_16.sort_unstable();
367
368 last_16[last_16.len() / 2]
369 };
370
371 (1. / (pi.as_secs_f64())) as i32
372 }
373 };
374
375 self.send_control(
379 now,
380 ControlTypes::Ack(AckControlInfo {
381 ack_seq_num,
382 ack_number,
383 rtt: Some(self.rtt),
384 rtt_variance: Some(self.rtt_variance),
385 buffer_available: None, packet_recv_rate: Some(packet_recv_rate),
387 est_link_cap: Some(est_link_cap),
388 }),
389 );
390
391 let ts_now = self.receive_buffer.timestamp_from(now);
393 self.ack_history_window.push(AckHistoryEntry {
394 ack_number,
395 ack_seq_num,
396 timestamp: ts_now,
397 });
398 }
399
400 fn on_nak_event(&mut self, now: Instant) {
401 self.timers.update_rtt(self.rtt, self.rtt_variance);
407
408 let ts_now = self.receive_buffer.timestamp_from(now);
415
416 let seq_nums = {
418 let mut ret = Vec::new();
419
420 let rtt = self.rtt;
421 for pak in self
422 .loss_list
423 .iter_mut()
424 .filter(|lle| lle.feedback_time < ts_now - rtt * lle.k)
425 {
426 pak.k += 1;
427 pak.feedback_time = ts_now;
428
429 ret.push(pak.seq_num);
430 }
431
432 ret
433 };
434
435 if seq_nums.is_empty() {
436 return;
437 }
438
439 self.send_nak(now, seq_nums.into_iter());
441 }
442
443 fn handle_handshake_packet(&mut self, now: Instant, control_info: HandshakeControlInfo) {
444 if let Some(c) = self.handshake.handle_handshake(control_info) {
445 self.send_control(now, c)
446 }
447 }
448
449 fn handle_srt_control_packet(&mut self, pack: SrtControlPacket) {
451 use self::SrtControlPacket::*;
452 match pack {
453 HandshakeRequest(_) | HandshakeResponse(_) => {
454 warn!("Received handshake SRT packet, HSv5 expected");
455 }
456 _ => unimplemented!(),
457 }
458 }
459
460 fn handle_ack2(&mut self, seq_num: i32, now: Instant) {
461 let id_in_wnd = match self
464 .ack_history_window
465 .as_slice()
466 .binary_search_by(|entry| entry.ack_seq_num.cmp(&seq_num))
467 {
468 Ok(i) => Some(i),
469 Err(_) => None,
470 };
471
472 if let Some(id) = id_in_wnd {
473 let AckHistoryEntry {
474 timestamp: send_timestamp,
475 ack_number,
476 ..
477 } = self.ack_history_window[id];
478
479 self.lr_ack_acked = (seq_num, ack_number);
481
482 let immediate_rtt = self.receive_buffer.timestamp_from(now) - send_timestamp;
486 self.rtt = (self.rtt * 7 + immediate_rtt) / 8;
487
488 self.rtt_variance =
490 (self.rtt_variance * 3 + (self.rtt_variance - immediate_rtt).abs()) / 4;
491
492 self.timers.update_rtt(self.rtt, self.rtt_variance);
494 } else {
495 warn!(
496 "ACK sequence number in ACK2 packet not found in ACK history: {}",
497 seq_num
498 );
499 }
500 }
501
502 fn handle_data_packet(&mut self, data: &DataPacket, now: Instant) {
503 let ts_now = self.receive_buffer.timestamp_from(now);
504
505 if data.seq_number % 16 == 0 {
510 self.probe_time = Some(ts_now)
511 } else if data.seq_number % 16 == 1 {
512 if let Some(pt) = self.probe_time {
514 self.packet_pair_window.push((data.seq_number, ts_now - pt));
516
517 self.probe_time = None
519 }
520 }
521 self.packet_history_window.push((data.seq_number, ts_now));
523
524 match data.seq_number.cmp(&self.lrsn) {
530 Ordering::Greater => {
531 for i in seq_num_range(self.lrsn, data.seq_number) {
533 self.loss_list.push(LossListEntry {
534 seq_num: i,
535 feedback_time: ts_now,
536 k: 2,
538 })
539 }
540
541 self.send_nak(now, seq_num_range(self.lrsn, data.seq_number));
542 }
543 Ordering::Less => {
546 match self.loss_list[..].binary_search_by(|ll| ll.seq_num.cmp(&data.seq_number)) {
547 Ok(i) => {
548 self.loss_list.remove(i);
549 }
550 Err(_) => {
551 debug!(
552 "Packet received that's not in the loss list: {:?}, loss_list={:?}",
553 data.seq_number,
554 self.loss_list
555 .iter()
556 .map(|ll| ll.seq_num.as_raw())
557 .collect::<Vec<_>>()
558 );
559 }
560 };
561 }
562 Ordering::Equal => {}
563 }
564
565 self.lrsn = max(data.seq_number + 1, self.lrsn);
567
568 if self.receive_buffer.next_release() > data.seq_number {
570 debug!("Received packet {:?} twice", data.seq_number);
571 return;
572 }
573
574 self.receive_buffer.add(data.clone());
575 }
576
577 fn send_nak(&mut self, now: Instant, lost_seq_nums: impl Iterator<Item = SeqNumber>) {
579 let vec: Vec<_> = lost_seq_nums.collect();
580 debug!("Sending NAK for={:?}", vec);
581
582 self.send_control(
583 now,
584 ControlTypes::Nak(compress_loss_list(vec.iter().cloned()).collect()),
585 );
586 }
587
588 fn pop_data(&mut self, now: Instant) -> Option<(Instant, Bytes)> {
589 while let Some(d) = self.receive_buffer.next_msg_tsbpd(now) {
591 self.data_release.push_back(d);
592 }
593
594 let _dropped = self.receive_buffer.drop_too_late_packets(now);
597
598 self.data_release.pop_front()
599 }
600
601 fn pop_conotrol_packet(&mut self) -> Option<Packet> {
602 self.control_packets.pop_front()
603 }
604
605 fn next_timer(&self, now: Instant) -> Instant {
606 match self.receive_buffer.next_message_release_time(now) {
607 Some(next_rel_time) => min(self.timers.next_timer(), next_rel_time),
608 None => self.timers.next_timer(),
609 }
610 }
611
612 fn send_control(&mut self, now: Instant, control: ControlTypes) {
613 self.control_packets
614 .push_back(Packet::Control(ControlPacket {
615 timestamp: self.receive_buffer.timestamp_from(now),
616 dest_sockid: self.settings.remote_sockid,
617 control_type: control,
618 }));
619 }
620}