srt_protocol/protocol/sender/
mod.rs

1mod buffer;
2mod congestion_control;
3mod encapsulate;
4
5use std::{
6    convert::TryFrom,
7    time::{Duration, Instant},
8};
9
10use bytes::Bytes;
11
12use crate::{
13    connection::{ConnectionSettings, ConnectionStatus},
14    options::*,
15    packet::*,
16    protocol::{
17        encryption::Encryption,
18        output::Output,
19        time::{TimeBase, Timers},
20    },
21    statistics::SocketStatistics,
22};
23
24use buffer::{AckAction, Loss, SendBuffer, SenderAction};
25use congestion_control::SenderCongestionControl;
26use encapsulate::Encapsulation;
27
28#[derive(Debug)]
29pub struct Sender {
30    time_base: TimeBase,
31    encapsulation: Encapsulation,
32    encryption: Encryption,
33    send_buffer: SendBuffer,
34    congestion_control: SenderCongestionControl,
35}
36
37impl Sender {
38    pub fn new(settings: ConnectionSettings) -> Self {
39        Self {
40            time_base: TimeBase::new(settings.socket_start_time),
41            encapsulation: Encapsulation::new(&settings),
42            encryption: Encryption::new(settings.cipher.clone()),
43            send_buffer: SendBuffer::new(&settings),
44            congestion_control: SenderCongestionControl::new(settings.bandwidth.clone()),
45        }
46    }
47
48    pub fn is_flushed(&self) -> bool {
49        self.send_buffer.is_flushed()
50    }
51
52    pub fn has_packets_to_send(&self) -> bool {
53        self.send_buffer.has_packets_to_send()
54    }
55
56    pub fn tx_buffered_time(&self) -> Duration {
57        self.send_buffer.duration()
58    }
59
60    pub fn tx_buffered_packets(&self) -> u64 {
61        u64::try_from(self.send_buffer.len()).unwrap()
62    }
63
64    pub fn tx_buffered_bytes(&self) -> u64 {
65        u64::try_from(self.send_buffer.len_bytes()).unwrap()
66    }
67}
68
69pub struct SenderContext<'a> {
70    status: &'a mut ConnectionStatus,
71    timers: &'a mut Timers,
72    output: &'a mut Output,
73    stats: &'a mut SocketStatistics,
74    sender: &'a mut Sender,
75}
76
77impl<'a> SenderContext<'a> {
78    pub fn new(
79        status: &'a mut ConnectionStatus,
80        timers: &'a mut Timers,
81        output: &'a mut Output,
82        stats: &'a mut SocketStatistics,
83        sender: &'a mut Sender,
84    ) -> Self {
85        Self {
86            status,
87            timers,
88            output,
89            stats,
90            sender,
91        }
92    }
93
94    pub fn handle_data(&mut self, now: Instant, item: (Instant, Bytes)) {
95        let (time, data) = item;
96        let (mut packets, mut bytes) = (0, 0);
97        let ts = self.sender.time_base.timestamp_from(time);
98        for packet in self.sender.encapsulation.encapsulate(ts, data) {
99            if let Some((bytes_enc, packet, km)) = self.sender.encryption.encrypt(packet) {
100                packets += 1;
101                bytes += packet.payload.len() as u64;
102                if bytes_enc > 0 {
103                    self.stats.tx_encrypted_data += 1;
104                }
105
106                if let Err((p_count, b_count)) = self.sender.send_buffer.push_data(packet) {
107                    self.stats.tx_dropped_data += p_count.0;
108                    self.stats.tx_dropped_bytes += b_count.0;
109                }
110
111                let control = km.map(ControlTypes::new_key_refresh_request);
112                if let Some(control) = control {
113                    self.output.send_control(now, control);
114                }
115            }
116        }
117
118        let snd_period =
119            self.sender
120                .congestion_control
121                .on_input(now, PacketCount(packets), ByteCount(bytes));
122        if let Some(snd_period) = snd_period {
123            self.timers.update_snd_period(snd_period)
124        }
125    }
126
127    pub fn handle_ack_packet(&mut self, now: Instant, ack: Acknowledgement) {
128        self.stats.rx_ack += 1;
129        if matches!(ack, Acknowledgement::Lite(_)) {
130            self.stats.rx_light_ack += 1;
131        }
132
133        match self.sender.send_buffer.update_largest_acked_seq_number(
134            ack.ack_number(),
135            ack.full_ack_seq_number(),
136            ack.rtt(),
137        ) {
138            Ok(AckAction {
139                received: _,
140                recovered: _,
141                send_ack2,
142            }) => {
143                // TODO: add received and recovered to connection statistics
144                if let Some(full_ack) = send_ack2 {
145                    self.output.send_control(now, ControlTypes::Ack2(full_ack))
146                }
147            }
148            Err(_error) => {
149                // self.warn("ack", now, &error);
150                // TODO: add statistic
151                // self.statistics.rx_ack2_errors += 1;
152            }
153        }
154    }
155
156    pub fn handle_nak_packet(&mut self, now: Instant, nak: CompressedLossList) {
157        self.stats.rx_nak += 1;
158        // 1) Add all sequence numbers carried in the NAK into the sender's loss list.
159        for (loss, range) in self.sender.send_buffer.add_to_loss_list(nak) {
160            //self.debug("nak", now, &(&loss, &range));
161            // TODO: figure out better statistics
162            use Loss::*;
163            match loss {
164                Ignored | Added => {
165                    self.stats.tx_loss_data += 1;
166                }
167                Dropped => {
168                    self.stats.tx_dropped_data += 1;
169
170                    // On a Live stream, where each packet is a message, just one NAK with
171                    // a compressed packet loss interval of significant size (e.g. [1,
172                    // 100_000] will result in a deluge of message drop request packet
173                    // transmissions from the sender, resembling a DoS attack on the receiver.
174                    // Even more pathological, this is most likely to happen when we absolutely
175                    // do not want it to happen, such as during periods of decreased network
176                    // throughput.
177                    //
178                    // For this reason, this implementation is explicitly inconsistent with the
179                    // reference implementation, which only sends a single message per message
180                    // drop request, if the message is still in the send buffer. We always send
181                    self.output.send_control(
182                        now,
183                        ControlTypes::new_drop_request(MsgNumber::new_truncate(0), range),
184                    )
185                }
186            }
187        }
188    }
189
190    pub fn handle_key_refresh_response(&mut self, keying_material: KeyingMaterialMessage) {
191        match self
192            .sender
193            .encryption
194            .handle_key_refresh_response(keying_material)
195        {
196            Ok(()) => {
197                // TODO: add statistic or "event" notification?
198            }
199            Err(_err) => {
200                //self.warn("key refresh response", &err),
201            }
202        }
203    }
204
205    pub fn on_snd_event(&mut self, now: Instant, elapsed_periods: u32) {
206        use SenderAction::*;
207        let ts_now = self.sender.time_base.timestamp_from(now);
208        let actions = self.sender.send_buffer.next_snd_actions(
209            ts_now,
210            elapsed_periods,
211            self.status.should_drain_send_buffer(),
212        );
213        for action in actions {
214            match action {
215                Send(d) => {
216                    self.stats.tx_unique_data += 1;
217                    self.output.send_data(now, d);
218                }
219                RetransmitNak(d) => {
220                    self.stats.tx_retransmit_data += 1;
221                    self.output.send_data(now, d);
222                }
223                RetransmitRto(d) => {
224                    self.stats.tx_retransmit_data += 1;
225                    self.output.send_data(now, d);
226                }
227                Drop(_) => {}
228                WaitForInput => {
229                    break;
230                }
231                WaitForAck { .. } => {
232                    break;
233                }
234            }
235        }
236    }
237}