Skip to main content

srt/protocol/receiver/
mod.rs

1use std::cmp::Ordering;
2use std::cmp::{max, min};
3use std::collections::VecDeque;
4use std::iter::Iterator;
5use std::net::SocketAddr;
6use std::time::{Duration, Instant};
7
8use bytes::Bytes;
9use log::{debug, info, trace, warn};
10
11use super::{TimeSpan, Timer};
12use crate::loss_compression::compress_loss_list;
13use crate::packet::{
14    AckControlInfo, ControlPacket, ControlTypes, DataPacket, HandshakeControlInfo, Packet,
15    SrtControlPacket,
16};
17use crate::protocol::handshake::Handshake;
18use crate::protocol::TimeStamp;
19use crate::{seq_number::seq_num_range, ConnectionSettings, SeqNumber};
20
21mod buffer;
22mod time;
23
24use buffer::RecvBuffer;
25
26#[derive(Debug, Clone)]
27#[allow(clippy::large_enum_variant)]
28pub enum ReceiverAlgorithmAction {
29    TimeBoundedReceive(Instant),
30    SendControl(ControlPacket, SocketAddr),
31    OutputData((Instant, Bytes)),
32    Close,
33}
34
35struct ReceiveTimers {
36    syn: Timer,
37    ack: Timer,
38    nak: Timer,
39}
40
41impl ReceiveTimers {
42    pub fn new(now: Instant) -> ReceiveTimers {
43        let syn = Duration::from_millis(10);
44        ReceiveTimers {
45            syn: Timer::new(syn, now),
46            ack: Timer::new(syn, now),
47            nak: Timer::new(syn, now),
48        }
49    }
50
51    pub fn update_rtt(&mut self, rtt: TimeSpan, rtt_var: TimeSpan) {
52        let rtt = Duration::from_micros(rtt.as_micros() as u64);
53        let rtt_var = Duration::from_micros(rtt_var.as_micros() as u64);
54        self.nak.set_period(4 * rtt + rtt_var + self.syn.period());
55        self.ack.set_period(4 * rtt + rtt_var + self.syn.period());
56    }
57
58    pub fn next_timer(&self) -> Instant {
59        min(self.ack.next_instant(), self.nak.next_instant())
60    }
61}
62
63struct LossListEntry {
64    seq_num: SeqNumber,
65
66    // last time it was feed into NAK
67    feedback_time: TimeStamp,
68
69    // the number of times this entry has been fed back into NAK
70    k: i32,
71}
72
73struct AckHistoryEntry {
74    /// the highest packet sequence number received that this ACK packet ACKs + 1
75    ack_number: SeqNumber,
76
77    /// the ack sequence number
78    ack_seq_num: i32,
79
80    /// timestamp that it was sent at
81    timestamp: TimeStamp,
82}
83
84pub struct Receiver {
85    settings: ConnectionSettings,
86
87    handshake: Handshake,
88
89    timers: ReceiveTimers,
90
91    control_packets: VecDeque<Packet>,
92
93    data_release: VecDeque<(Instant, Bytes)>,
94
95    /// the round trip time, in microseconds
96    /// is calculated each ACK2
97    rtt: TimeSpan,
98
99    /// the round trip time variance, in microseconds
100    /// is calculated each ACK2
101    rtt_variance: TimeSpan,
102
103    /// https://tools.ietf.org/html/draft-gg-udt-03#page-12
104    /// Receiver's Loss List: It is a list of tuples whose values include:
105    /// the sequence numbers of detected lost data packets, the latest
106    /// feedback time of each tuple, and a parameter k that is the number
107    /// of times each one has been fed back in NAK. Values are stored in
108    /// the increasing order of packet sequence numbers.
109    loss_list: Vec<LossListEntry>,
110
111    /// https://tools.ietf.org/html/draft-gg-udt-03#page-12
112    /// ACK History Window: A circular array of each sent ACK and the time
113    /// it is sent out. The most recent value will overwrite the oldest
114    /// one if no more free space in the array.
115    ack_history_window: Vec<AckHistoryEntry>,
116
117    /// https://tools.ietf.org/html/draft-gg-udt-03#page-12
118    /// PKT History Window: A circular array that records the arrival time
119    /// of each data packet.
120    ///
121    /// First is sequence number, second is timestamp
122    packet_history_window: Vec<(SeqNumber, TimeStamp)>,
123
124    /// https://tools.ietf.org/html/draft-gg-udt-03#page-12
125    /// Packet Pair Window: A circular array that records the time
126    /// interval between each probing packet pair.
127    ///
128    /// First is seq num, second is time
129    packet_pair_window: Vec<(SeqNumber, TimeSpan)>,
130
131    /// the highest received packet sequence number + 1
132    lrsn: SeqNumber,
133
134    /// The ID of the next ack packet
135    next_ack: i32,
136
137    /// The timestamp of the probe time
138    /// Used to see duration between packets
139    probe_time: Option<TimeStamp>,
140
141    /// The ACK sequence number of the largest ACK2 received, and the ack number
142    lr_ack_acked: (i32, SeqNumber),
143
144    /// The buffer
145    receive_buffer: RecvBuffer,
146
147    /// Shutdown flag. This is set so when the buffer is flushed, it returns Async::Ready(None)
148    shutdown_flag: bool,
149}
150
151impl Receiver {
152    pub fn new(settings: ConnectionSettings, handshake: Handshake) -> Self {
153        let init_seq_num = settings.init_seq_num;
154
155        info!(
156            "Receiving started from {:?}, with latency={:?}",
157            settings.remote, settings.tsbpd_latency
158        );
159
160        Receiver {
161            settings,
162            timers: ReceiveTimers::new(settings.socket_start_time),
163            control_packets: VecDeque::new(),
164            data_release: VecDeque::new(),
165            handshake,
166            rtt: TimeSpan::from_micros(10_000),
167            rtt_variance: TimeSpan::from_micros(1_000),
168            loss_list: Vec::new(),
169            ack_history_window: Vec::new(),
170            packet_history_window: Vec::new(),
171            packet_pair_window: Vec::new(),
172            lrsn: init_seq_num, // at start, we have received everything until the first packet, exclusive (aka nothing)
173            next_ack: 1,
174            probe_time: None,
175            lr_ack_acked: (0, init_seq_num),
176            receive_buffer: RecvBuffer::with(&settings),
177            shutdown_flag: false,
178        }
179    }
180
181    pub fn handle_shutdown(&mut self) {
182        self.shutdown_flag = true;
183    }
184
185    // handles an incoming a packet
186    pub fn handle_packet(&mut self, now: Instant, (packet, from): (Packet, SocketAddr)) {
187        // We don't care about packets from elsewhere
188        if from != self.settings.remote {
189            info!("Packet received from unknown address: {:?}", from);
190            return;
191        }
192
193        if self.settings.local_sockid != packet.dest_sockid() {
194            // packet isn't applicable
195            info!(
196                "Packet send to socket id ({}) that does not match local ({})",
197                packet.dest_sockid().0,
198                self.settings.local_sockid.0
199            );
200            return;
201        }
202
203        trace!("Received packet: {:?}", packet);
204
205        match packet {
206            Packet::Control(ctrl) => {
207                self.receive_buffer.synchronize_clock(now, ctrl.timestamp);
208
209                // handle the control packet
210                match ctrl.control_type {
211                    ControlTypes::Ack { .. } => warn!("Receiver received ACK packet, unusual"),
212                    ControlTypes::Ack2(seq_num) => self.handle_ack2(seq_num, now),
213                    ControlTypes::DropRequest { .. } => unimplemented!(),
214                    ControlTypes::Handshake(shake) => self.handle_handshake_packet(now, shake),
215                    ControlTypes::KeepAlive => {} // TODO: actually reset EXP etc
216                    ControlTypes::Nak { .. } => warn!("Receiver received NAK packet, unusual"),
217                    ControlTypes::Shutdown => {
218                        info!("Shutdown packet received, flushing receiver...");
219                        self.shutdown_flag = true;
220                    } // end of stream
221                    ControlTypes::Srt(srt_packet) => {
222                        self.handle_srt_control_packet(srt_packet);
223                    }
224                }
225            }
226            Packet::Data(data) => self.handle_data_packet(&data, now),
227        };
228    }
229
230    /// 6.2 The Receiver's Algorithm
231    pub fn next_algorithm_action(&mut self, now: Instant) -> ReceiverAlgorithmAction {
232        use ReceiverAlgorithmAction::*;
233
234        //   Data Sending Algorithm:
235        //   1) Query the system time to check if ACK, NAK, or EXP timer has
236        //      expired. If there is any, process the event (as described below
237        //      in this section) and reset the associated time variables. For
238        //      ACK, also check the ACK packet interval.
239        if self.timers.ack.check_expired(now).is_some() {
240            self.on_ack_event(now);
241        }
242        if self.timers.nak.check_expired(now).is_some() {
243            self.on_nak_event(now);
244        }
245
246        if let Some(data) = self.pop_data(now) {
247            OutputData(data)
248        } else if let Some(Packet::Control(packet)) = self.pop_conotrol_packet() {
249            SendControl(packet, self.settings.remote)
250        } else if self.shutdown_flag && self.is_flushed() {
251            Close
252        } else {
253            // 2) Start time bounded UDP receiving. If no packet arrives, go to 1).
254            TimeBoundedReceive(self.next_timer(now))
255        }
256    }
257
258    pub fn is_flushed(&self) -> bool {
259        self.receive_buffer.next_msg_ready().is_none()
260    }
261
262    fn on_ack_event(&mut self, now: Instant) {
263        trace!("Ack event hit {:?}", self.settings.local_sockid);
264        // get largest inclusive received packet number
265        let ack_number = match self.loss_list.first() {
266            // There is an element in the loss list
267            Some(i) => i.seq_num,
268            // No elements, use lrsn, as it's already exclusive
269            None => self.lrsn,
270        };
271
272        // 2) If (a) the ACK number equals to the largest ACK number ever
273        //    acknowledged by ACK2
274        if ack_number == self.lr_ack_acked.1 {
275            // stop (do not send this ACK).
276            return;
277        }
278
279        // make sure this ACK number is greater or equal to a one sent previously
280        if let Some(w) = self.ack_history_window.last() {
281            assert!(w.ack_number <= ack_number);
282        }
283
284        trace!(
285            "Sending ACK; ack_num={:?}, lr_ack_acked={:?}",
286            ack_number,
287            self.lr_ack_acked.1
288        );
289
290        if let Some(&AckHistoryEntry {
291            ack_number: last_ack_number,
292            timestamp: last_timestamp,
293            ..
294        }) = self.ack_history_window.first()
295        {
296            // or, (b) it is equal to the ACK number in the
297            // last ACK
298            if last_ack_number == ack_number &&
299                // and the time interval between this two ACK packets is
300                // less than 2 RTTs,
301                (self.receive_buffer.timestamp_from(now) - last_timestamp) < (self.rtt * 2)
302            {
303                // stop (do not send this ACK).
304                return;
305            }
306        }
307
308        // 3) Assign this ACK a unique increasing ACK sequence number.
309        let ack_seq_num = self.next_ack;
310        self.next_ack += 1;
311
312        // 4) Calculate the packet arrival speed according to the following
313        // algorithm:
314        let packet_recv_rate = if self.packet_history_window.len() < 16 {
315            0
316        } else {
317            // Calculate the median value of the last 16 packet arrival
318            // intervals (AI) using the values stored in PKT History Window.
319            let mut last_16: Vec<_> = self.packet_history_window
320                [self.packet_history_window.len() - 16..]
321                .windows(2)
322                .map(|w| w[1].1 - w[0].1) // delta time
323                .collect();
324            last_16.sort();
325
326            // the median AI
327            let ai = last_16[last_16.len() / 2];
328
329            // In these 16 values, remove those either greater than AI*8 or
330            // less than AI/8.
331            let filtered: Vec<TimeSpan> = last_16
332                .iter()
333                .filter(|&&n| n / 8 < ai && n > ai / 8)
334                .cloned()
335                .collect();
336
337            // If more than 8 values are left, calculate the
338            // average of the left values AI', and the packet arrival speed is
339            // 1/AI' (number of packets per second). Otherwise, return 0.
340            if filtered.len() > 8 {
341                // 1e6 / (sum / len) = len * 1e6 / sum
342                (1_000_000 * filtered.len()) as u64
343                    / filtered
344                        .iter()
345                        .map(|dt| i64::from(dt.as_micros()))
346                        .sum::<i64>() as u64 // all these dts are garunteed to be positive
347            } else {
348                0
349            }
350        } as u32;
351
352        // 5) Calculate the estimated link capacity according to the following algorithm:
353        let est_link_cap = {
354            if self.packet_pair_window.len() < 16 {
355                0
356            } else {
357                //  Calculate the median value of the last 16 packet pair
358                //  intervals (PI) using the values in Packet Pair Window, and the
359                //  link capacity is 1/PI (number of packets per second).
360                let pi = {
361                    let mut last_16: Vec<_> = self.packet_pair_window
362                        [self.packet_pair_window.len() - 16..]
363                        .iter()
364                        .map(|&(_, time)| time)
365                        .collect();
366                    last_16.sort_unstable();
367
368                    last_16[last_16.len() / 2]
369                };
370
371                (1. / (pi.as_secs_f64())) as i32
372            }
373        };
374
375        // Pack the ACK packet with RTT, RTT Variance, and flow window size (available
376        // receiver buffer size).
377
378        self.send_control(
379            now,
380            ControlTypes::Ack(AckControlInfo {
381                ack_seq_num,
382                ack_number,
383                rtt: Some(self.rtt),
384                rtt_variance: Some(self.rtt_variance),
385                buffer_available: None, // TODO: add this
386                packet_recv_rate: Some(packet_recv_rate),
387                est_link_cap: Some(est_link_cap),
388            }),
389        );
390
391        // add it to the ack history
392        let ts_now = self.receive_buffer.timestamp_from(now);
393        self.ack_history_window.push(AckHistoryEntry {
394            ack_number,
395            ack_seq_num,
396            timestamp: ts_now,
397        });
398    }
399
400    fn on_nak_event(&mut self, now: Instant) {
401        // reset NAK timer, rtt and variance are in us, so convert to ns
402
403        // NAK is used to trigger a negative acknowledgement (NAK). Its period
404        // is dynamically updated to 4 * RTT_+ RTTVar + SYN, where RTTVar is the
405        // variance of RTT samples.
406        self.timers.update_rtt(self.rtt, self.rtt_variance);
407
408        // Search the receiver's loss list, find out all those sequence numbers
409        // whose last feedback time is k*RTT before, where k is initialized as 2
410        // and increased by 1 each time the number is fed back. Compress
411        // (according to section 6.4) and send these numbers back to the sender
412        // in an NAK packet.
413
414        let ts_now = self.receive_buffer.timestamp_from(now);
415
416        // increment k and change feedback time, returning sequence numbers
417        let seq_nums = {
418            let mut ret = Vec::new();
419
420            let rtt = self.rtt;
421            for pak in self
422                .loss_list
423                .iter_mut()
424                .filter(|lle| lle.feedback_time < ts_now - rtt * lle.k)
425            {
426                pak.k += 1;
427                pak.feedback_time = ts_now;
428
429                ret.push(pak.seq_num);
430            }
431
432            ret
433        };
434
435        if seq_nums.is_empty() {
436            return;
437        }
438
439        // send the nak
440        self.send_nak(now, seq_nums.into_iter());
441    }
442
443    fn handle_handshake_packet(&mut self, now: Instant, control_info: HandshakeControlInfo) {
444        if let Some(c) = self.handshake.handle_handshake(control_info) {
445            self.send_control(now, c)
446        }
447    }
448
449    // handles a SRT control packet
450    fn handle_srt_control_packet(&mut self, pack: SrtControlPacket) {
451        use self::SrtControlPacket::*;
452        match pack {
453            HandshakeRequest(_) | HandshakeResponse(_) => {
454                warn!("Received handshake SRT packet, HSv5 expected");
455            }
456            _ => unimplemented!(),
457        }
458    }
459
460    fn handle_ack2(&mut self, seq_num: i32, now: Instant) {
461        // 1) Locate the related ACK in the ACK History Window according to the
462        //    ACK sequence number in this ACK2.
463        let id_in_wnd = match self
464            .ack_history_window
465            .as_slice()
466            .binary_search_by(|entry| entry.ack_seq_num.cmp(&seq_num))
467        {
468            Ok(i) => Some(i),
469            Err(_) => None,
470        };
471
472        if let Some(id) = id_in_wnd {
473            let AckHistoryEntry {
474                timestamp: send_timestamp,
475                ack_number,
476                ..
477            } = self.ack_history_window[id];
478
479            // 2) Update the largest ACK number ever been acknowledged.
480            self.lr_ack_acked = (seq_num, ack_number);
481
482            // 3) Calculate new rtt according to the ACK2 arrival time and the ACK
483            //    departure time, and update the RTT value as: RTT = (RTT * 7 +
484            //    rtt) / 8
485            let immediate_rtt = self.receive_buffer.timestamp_from(now) - send_timestamp;
486            self.rtt = (self.rtt * 7 + immediate_rtt) / 8;
487
488            // 4) Update RTTVar by: RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4.
489            self.rtt_variance =
490                (self.rtt_variance * 3 + (self.rtt_variance - immediate_rtt).abs()) / 4;
491
492            // 5) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN.
493            self.timers.update_rtt(self.rtt, self.rtt_variance);
494        } else {
495            warn!(
496                "ACK sequence number in ACK2 packet not found in ACK history: {}",
497                seq_num
498            );
499        }
500    }
501
502    fn handle_data_packet(&mut self, data: &DataPacket, now: Instant) {
503        let ts_now = self.receive_buffer.timestamp_from(now);
504
505        // 2&3 don't apply
506
507        // 4) If the sequence number of the current data packet is 16n + 1,
508        //     where n is an integer, record the time interval between this
509        if data.seq_number % 16 == 0 {
510            self.probe_time = Some(ts_now)
511        } else if data.seq_number % 16 == 1 {
512            // if there is an entry
513            if let Some(pt) = self.probe_time {
514                // calculate and insert
515                self.packet_pair_window.push((data.seq_number, ts_now - pt));
516
517                // reset
518                self.probe_time = None
519            }
520        }
521        // 5) Record the packet arrival time in PKT History Window.
522        self.packet_history_window.push((data.seq_number, ts_now));
523
524        // 6)
525        // a. If the sequence number of the current data packet is greater
526        //    than LRSN, put all the sequence numbers between (but
527        //    excluding) these two values into the receiver's loss list and
528        //    send them to the sender in an NAK packet.
529        match data.seq_number.cmp(&self.lrsn) {
530            Ordering::Greater => {
531                // lrsn is the latest packet received, so nak the one after that
532                for i in seq_num_range(self.lrsn, data.seq_number) {
533                    self.loss_list.push(LossListEntry {
534                        seq_num: i,
535                        feedback_time: ts_now,
536                        // k is initialized at 2, as stated on page 12 (very end)
537                        k: 2,
538                    })
539                }
540
541                self.send_nak(now, seq_num_range(self.lrsn, data.seq_number));
542            }
543            // b. If the sequence number is less than LRSN, remove it from the
544            //    receiver's loss list.
545            Ordering::Less => {
546                match self.loss_list[..].binary_search_by(|ll| ll.seq_num.cmp(&data.seq_number)) {
547                    Ok(i) => {
548                        self.loss_list.remove(i);
549                    }
550                    Err(_) => {
551                        debug!(
552                            "Packet received that's not in the loss list: {:?}, loss_list={:?}",
553                            data.seq_number,
554                            self.loss_list
555                                .iter()
556                                .map(|ll| ll.seq_num.as_raw())
557                                .collect::<Vec<_>>()
558                        );
559                    }
560                };
561            }
562            Ordering::Equal => {}
563        }
564
565        // record that we got this packet
566        self.lrsn = max(data.seq_number + 1, self.lrsn);
567
568        // we've already gotten this packet, drop it
569        if self.receive_buffer.next_release() > data.seq_number {
570            debug!("Received packet {:?} twice", data.seq_number);
571            return;
572        }
573
574        self.receive_buffer.add(data.clone());
575    }
576
577    // send a NAK, and return the future
578    fn send_nak(&mut self, now: Instant, lost_seq_nums: impl Iterator<Item = SeqNumber>) {
579        let vec: Vec<_> = lost_seq_nums.collect();
580        debug!("Sending NAK for={:?}", vec);
581
582        self.send_control(
583            now,
584            ControlTypes::Nak(compress_loss_list(vec.iter().cloned()).collect()),
585        );
586    }
587
588    fn pop_data(&mut self, now: Instant) -> Option<(Instant, Bytes)> {
589        // try to release packets
590        while let Some(d) = self.receive_buffer.next_msg_tsbpd(now) {
591            self.data_release.push_back(d);
592        }
593
594        // drop packets
595        // TODO: do something with this
596        let _dropped = self.receive_buffer.drop_too_late_packets(now);
597
598        self.data_release.pop_front()
599    }
600
601    fn pop_conotrol_packet(&mut self) -> Option<Packet> {
602        self.control_packets.pop_front()
603    }
604
605    fn next_timer(&self, now: Instant) -> Instant {
606        match self.receive_buffer.next_message_release_time(now) {
607            Some(next_rel_time) => min(self.timers.next_timer(), next_rel_time),
608            None => self.timers.next_timer(),
609        }
610    }
611
612    fn send_control(&mut self, now: Instant, control: ControlTypes) {
613        self.control_packets
614            .push_back(Packet::Control(ControlPacket {
615                timestamp: self.receive_buffer.timestamp_from(now),
616                dest_sockid: self.settings.remote_sockid,
617                control_type: control,
618            }));
619    }
620}