Skip to main content

srt/protocol/sender/
mod.rs

1use std::collections::VecDeque;
2use std::net::SocketAddr;
3use std::time::{Duration, Instant};
4
5use bytes::Bytes;
6use log::debug;
7use log::{trace, warn};
8
9use super::TimeSpan;
10use crate::loss_compression::decompress_loss_list;
11use crate::packet::{AckControlInfo, ControlTypes, HandshakeControlInfo, SrtControlPacket};
12use crate::protocol::handshake::Handshake;
13use crate::protocol::sender::buffers::*;
14use crate::protocol::Timer;
15use crate::Packet::*;
16use crate::{
17    CCData, CongestCtrl, ConnectionSettings, ControlPacket, DataPacket, Packet, SeqNumber,
18    SrtCongestCtrl,
19};
20
21mod buffers;
22
23#[derive(Debug)]
24pub enum SenderError {}
25
26pub type SenderResult = Result<(), SenderError>;
27
28#[derive(Debug, Clone, Copy)]
29pub struct SenderMetrics {
30    /// Round trip time, in microseconds
31    pub rtt: TimeSpan,
32
33    /// Round trip time variance
34    pub rtt_var: TimeSpan,
35
36    /// packet arrival rate
37    pub pkt_arr_rate: u32,
38
39    /// estimated link capacity
40    pub est_link_cap: i32,
41
42    /// Total lost packets
43    pub lost_packets: u32,
44
45    /// Total retransmitted packets
46    pub retrans_packets: u32,
47
48    /// Total received packets (packets that have been ACKed)
49    pub recvd_packets: u32,
50}
51
52impl SenderMetrics {
53    pub fn new() -> Self {
54        Self {
55            rtt: TimeSpan::from_micros(10_000),
56            rtt_var: TimeSpan::from_micros(0),
57            pkt_arr_rate: 0,
58            est_link_cap: 0,
59            lost_packets: 0,
60            retrans_packets: 0,
61            recvd_packets: 0,
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub enum SenderAlgorithmAction {
68    WaitUntilAck,
69    WaitForData,
70    WaitUntil(Instant),
71    Close,
72}
73
74#[derive(Debug, Clone, PartialEq)]
75pub enum SenderAlgorithmStep {
76    Step1,
77    Step6,
78}
79
80pub struct Sender {
81    /// The settings, including remote sockid and address
82    settings: ConnectionSettings,
83
84    handshake: Handshake,
85
86    /// The congestion control
87    congestion_control: SrtCongestCtrl,
88
89    metrics: SenderMetrics,
90
91    /// The buffer to store packets for retransmission, sorted chronologically
92    send_buffer: SendBuffer,
93
94    /// The buffer to store the next control packet
95    output_buffer: VecDeque<Packet>,
96
97    /// The buffer to store packets for transmission
98    transmit_buffer: TransmitBuffer,
99
100    // 1) Sender's Loss List: The sender's loss list is used to store the
101    //    sequence numbers of the lost packets fed back by the receiver
102    //    through NAK packets or inserted in a timeout event. The numbers
103    //    are stored in increasing order.
104    loss_list: LossList,
105
106    /// The sequence number of the largest acknowledged packet + 1
107    lr_acked_packet: SeqNumber,
108
109    /// The ack sequence number that an ack2 has been sent for
110    lr_acked_ack: i32,
111
112    step: SenderAlgorithmStep,
113
114    snd_timer: Timer,
115
116    close: bool,
117}
118
119impl Default for SenderMetrics {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125impl Sender {
126    pub fn new(
127        settings: ConnectionSettings,
128        handshake: Handshake,
129        congestion_control: SrtCongestCtrl,
130    ) -> Self {
131        Self {
132            settings,
133            handshake,
134            congestion_control,
135            metrics: SenderMetrics::new(),
136            send_buffer: SendBuffer::new(&settings),
137            loss_list: LossList::new(&settings),
138            lr_acked_packet: settings.init_seq_num,
139            lr_acked_ack: -1, // TODO: why magic number?
140            output_buffer: VecDeque::new(),
141            transmit_buffer: TransmitBuffer::new(&settings),
142            step: SenderAlgorithmStep::Step1,
143            snd_timer: Timer::new(Duration::from_millis(1), settings.socket_start_time),
144            close: false,
145        }
146    }
147
148    pub fn settings(&self) -> &ConnectionSettings {
149        &self.settings
150    }
151
152    pub fn handle_close(&mut self) {
153        if !self.close {
154            self.close = true;
155        }
156    }
157
158    pub fn handle_data(&mut self, data: (Instant, Bytes)) {
159        self.transmit_buffer.push_message(data);
160    }
161
162    fn handle_snd_timer(&mut self, now: Instant) {
163        self.snd_timer.reset(now);
164        self.step = SenderAlgorithmStep::Step1;
165    }
166
167    pub fn handle_packet(
168        &mut self,
169        (packet, from): (Packet, SocketAddr),
170        now: Instant,
171    ) -> SenderResult {
172        // TODO: record/report packets from invalid hosts?
173        if from != self.settings.remote {
174            return Ok(());
175        }
176
177        log::info!("Received packet {:?}", packet);
178
179        match packet {
180            Control(control) => self.handle_control_packet(control, now),
181            Data(data) => self.handle_data_packet(data, now),
182        }
183    }
184
185    pub fn is_flushed(&mut self) -> bool {
186        self.loss_list.is_empty()
187            && self.transmit_buffer.is_empty()
188            && self.lr_acked_packet == self.transmit_buffer.next_sequence_number
189            && self.send_buffer.is_empty()
190            && self.output_buffer.is_empty()
191    }
192
193    pub fn pop_output(&mut self) -> Option<(Packet, SocketAddr)> {
194        let to = self.settings.remote;
195        self.output_buffer
196            .pop_front()
197            .map(move |packet| (packet, to))
198    }
199
200    pub fn next_action(&mut self, now: Instant) -> SenderAlgorithmAction {
201        use SenderAlgorithmAction::*;
202        use SenderAlgorithmStep::*;
203
204        // don't return close until fully flushed
205        if self.close && self.is_flushed() {
206            debug!("{:?} sending shutdown", self.settings.local_sockid);
207            self.send_control(ControlTypes::Shutdown, now); // TODO: could send more than one
208            return Close;
209        }
210
211        if let Some(exp_time) = self.snd_timer.check_expired(now) {
212            self.handle_snd_timer(exp_time);
213        }
214
215        if self.step == Step6 {
216            return WaitUntil(self.snd_timer.next_instant());
217        }
218
219        //   1) If the sender's loss list is not empty, retransmit the first
220        //      packet in the list and remove it from the list. Go to 5).
221        if let Some(p) = self.loss_list.pop_front() {
222            debug!("Sending packet in loss list, seq={:?}", p.seq_number);
223            self.send_data(p);
224
225            // TODO: returning here will result in sending all the packets in the loss
226            //       list before progressing further through the sender algorithm. This
227            //       appears to be inconsistent with the UDT spec. Is it consistent
228            //       with the reference implementation?
229            return WaitForData;
230        }
231        // TODO: what is messaging mode?
232        // TODO: I honestly don't know what this means
233        //
234        //   2) In messaging mode, if the packets has been the loss list for a
235        //      time more than the application specified TTL, send a message drop
236        //      request and remove all related packets from the loss list. Go to
237        //      1).
238
239        //   3) Wait until there is application data to be sent.
240        else if self.transmit_buffer.is_empty() {
241            // TODO: the spec for 3) seems to suggest waiting at here for data,
242            //       but if execution doesn't jump back to Step1, then many of
243            //       the tests don't pass... WAT?
244            return WaitForData;
245        }
246        //   4)
247        //        a. If the number of unacknowledged packets exceeds the
248        //           flow/congestion window size, wait until an ACK comes. Go to
249        //           1).
250        //        b. Pack a new data packet and send it out.
251        // TODO: account for looping here <--- WAT?
252        else if self.lr_acked_packet
253            < self.transmit_buffer.next_sequence_number - self.congestion_control.window_size()
254        {
255            // flow window exceeded, wait for ACK
256            trace!("Flow window exceeded lr_acked={:?}, next_seq={:?}, window_size={}, next_seq-window={:?}",
257                   self.lr_acked_packet,
258                   self.transmit_buffer.next_sequence_number,
259                   self.congestion_control.window_size(),
260                   self.transmit_buffer.next_sequence_number - self.congestion_control.window_size());
261
262            return WaitUntilAck;
263        } else if let Some(p) = self.pop_transmit_buffer() {
264            self.send_data(p);
265        }
266
267        //   5) If the sequence number of the current packet is 16n, where n is an
268        //      integer, go to 2).
269        if let Some(p) = self.pop_transmit_buffer_16n() {
270            //      NOTE: to get the closest timing, we ignore congestion control
271            //      and send the 16th packet immediately, instead of proceeding to step 2
272            self.send_data(p);
273        }
274
275        //   6) Wait (SND - t) time, where SND is the inter-packet interval
276        //      updated by congestion control and t is the total time used by step
277        //      1 to step 5. Go to 1).
278        self.step = Step6;
279        let period = self.congestion_control.send_interval();
280        self.snd_timer.set_period(period);
281        WaitUntil(self.snd_timer.next_instant())
282    }
283
284    fn handle_data_packet(&mut self, _packet: DataPacket, _now: Instant) -> SenderResult {
285        Ok(())
286    }
287
288    fn handle_control_packet(&mut self, packet: ControlPacket, now: Instant) -> SenderResult {
289        match packet.control_type {
290            ControlTypes::Ack(info) => self.handle_ack_packet(now, &info),
291            ControlTypes::Ack2(_) => {
292                warn!("Sender received ACK2, unusual");
293                Ok(())
294            }
295            ControlTypes::DropRequest { .. } => unimplemented!(),
296            ControlTypes::Handshake(shake) => self.handle_handshake_packet(shake, now),
297            // TODO: reset EXP-ish
298
299            // TODO: case UMSG_CGWARNING: // 100 - Delay Warning
300            //            // One way packet delay is increasing, so decrease the sending rate
301            //            ControlTypes::DelayWarning?
302
303            // TODO: case UMSG_LOSSREPORT: // 011 - Loss Report is this Nak?
304            // TODO: case UMSG_DROPREQ: // 111 - Msg drop request
305            // TODO: case UMSG_PEERERROR: // 1000 - An error has happened to the peer side
306            // TODO: case UMSG_EXT: // 0x7FFF - reserved and user defined messages
307            ControlTypes::Nak(nack) => self.handle_nack_packet(nack),
308            ControlTypes::Shutdown => self.handle_shutdown_packet(),
309            ControlTypes::Srt(srt_packet) => self.handle_srt_control_packet(srt_packet),
310            // The only purpose of keep-alive packet is to tell that the peer is still alive
311            // nothing needs to be done.
312            // TODO: is this actually true? check reference implementation
313            ControlTypes::KeepAlive => Ok(()),
314        }
315    }
316
317    #[allow(clippy::too_many_arguments)]
318    fn handle_ack_packet(&mut self, now: Instant, info: &AckControlInfo) -> SenderResult {
319        // if this ack number is less than or equal to
320        // the largest received ack number, than discard it
321        // this can happen thorough packet reordering OR losing an ACK2 packet
322        if info.ack_number <= self.lr_acked_packet {
323            return Ok(());
324        }
325
326        if info.ack_seq_num <= self.lr_acked_ack {
327            // warn!("Ack sequence number '{}' less than or equal to the previous one recieved: '{}'", ack_seq_num, self.lr_acked_ack);
328            return Ok(());
329        }
330        self.lr_acked_ack = info.ack_seq_num;
331
332        // update the packets received count
333        self.metrics.recvd_packets += info.ack_number - self.lr_acked_packet;
334
335        // 1) Update the largest acknowledged sequence number, which is the ACK number
336        self.lr_acked_packet = info.ack_number;
337
338        // 2) Send back an ACK2 with the same ACK sequence number in this ACK.
339        self.send_control(ControlTypes::Ack2(info.ack_seq_num), now);
340
341        // 3) Update RTT and RTTVar.
342        self.metrics.rtt = info.rtt.unwrap_or_else(|| TimeSpan::from_micros(0));
343        self.metrics.rtt_var = info
344            .rtt_variance
345            .unwrap_or_else(|| TimeSpan::from_micros(0));
346
347        // 4) Update both ACK and NAK period to 4 * RTT + RTTVar + SYN.
348        // TODO: figure out why this makes sense, the sender shouldn't send ACK or NAK packets.
349
350        // 5) Update flow window size.
351        {
352            let cc_info = self.make_cc_info();
353            self.congestion_control.on_ack(&cc_info);
354        }
355
356        // 6) If this is a Light ACK, stop.
357        // TODO: wat
358
359        // 7) Update packet arrival rate: A = (A * 7 + a) / 8, where a is the
360        //    value carried in the ACK.
361        self.metrics.pkt_arr_rate =
362            self.metrics.pkt_arr_rate / 8 * 7 + info.packet_recv_rate.unwrap_or(0) / 8;
363
364        // 8) Update estimated link capacity: B = (B * 7 + b) / 8, where b is
365        //    the value carried in the ACK.
366        self.metrics.est_link_cap =
367            (self.metrics.est_link_cap * 7 + info.est_link_cap.unwrap_or(0)) / 8;
368
369        // 9) Update sender's buffer (by releasing the buffer that has been
370        //    acknowledged).
371        self.send_buffer
372            .release_acknowledged_packets(info.ack_number);
373
374        // 10) Update sender's loss list (by removing all those that has been
375        //     acknowledged).
376        self.metrics.retrans_packets += self.loss_list.remove_acknowledged_packets(info.ack_number);
377
378        Ok(())
379    }
380
381    fn handle_shutdown_packet(&mut self) -> SenderResult {
382        self.close = true;
383        Ok(())
384    }
385
386    fn handle_nack_packet(&mut self, nack: Vec<u32>) -> SenderResult {
387        // 1) Add all sequence numbers carried in the NAK into the sender's loss list.
388        // 2) Update the SND period by rate control (see section 3.6).
389        // 3) Reset the EXP time variable.
390
391        for lost in self
392            .send_buffer
393            .get(decompress_loss_list(nack.iter().cloned()))
394        {
395            let packet = match lost {
396                Ok(p) => p,
397                Err(n) => {
398                    debug!("NAK received for packet {} that's not in the buffer, maybe it's already been ACKed", n);
399                    return Ok(());
400                }
401            };
402
403            self.loss_list.push_back(packet.clone());
404        }
405
406        // update CC
407        if let Some(last_packet) = self.loss_list.back() {
408            let cc_info = self.make_cc_info();
409            self.congestion_control
410                .on_nak(last_packet.seq_number, &cc_info);
411        }
412
413        // TODO: reset EXP
414        Ok(())
415    }
416
417    fn handle_handshake_packet(
418        &mut self,
419        handshake: HandshakeControlInfo,
420        now: Instant,
421    ) -> SenderResult {
422        if let Some(control_type) = self.handshake.handle_handshake(handshake) {
423            self.send_control(control_type, now);
424        }
425        Ok(())
426    }
427
428    fn handle_srt_control_packet(&mut self, packet: SrtControlPacket) -> SenderResult {
429        use self::SrtControlPacket::*;
430
431        match packet {
432            HandshakeRequest(_) | HandshakeResponse(_) => {
433                warn!("Received handshake request or response for an already setup SRT connection")
434            }
435            _ => unimplemented!(),
436        }
437
438        Ok(())
439    }
440
441    fn make_cc_info(&self) -> CCData {
442        CCData {
443            est_bandwidth: self.metrics.est_link_cap,
444            max_segment_size: self.settings.max_packet_size,
445            latest_seq_num: Some(self.transmit_buffer.latest_seqence_number()),
446            packet_arr_rate: self.metrics.pkt_arr_rate,
447            rtt: Duration::from_micros(self.metrics.rtt.as_micros() as u64),
448        }
449    }
450
451    fn pop_transmit_buffer(&mut self) -> Option<DataPacket> {
452        let packet = self.transmit_buffer.pop_front()?;
453        self.congestion_control.on_packet_sent(&self.make_cc_info());
454        self.send_buffer.push_back(packet.clone());
455        Some(packet)
456    }
457
458    fn pop_transmit_buffer_16n(&mut self) -> Option<DataPacket> {
459        match self.transmit_buffer.front().map(|p| p.seq_number % 16) {
460            Some(0) => self.pop_transmit_buffer(),
461            _ => None,
462        }
463    }
464
465    fn send_control(&mut self, control: ControlTypes, now: Instant) {
466        self.output_buffer.push_back(Packet::Control(ControlPacket {
467            timestamp: self.transmit_buffer.timestamp_from(now),
468            dest_sockid: self.settings.remote_sockid,
469            control_type: control,
470        }));
471    }
472
473    fn send_data(&mut self, p: DataPacket) {
474        self.output_buffer.push_back(Packet::Data(p));
475    }
476}