1use std::collections::VecDeque;
2use std::net::SocketAddr;
3use std::time::{Duration, Instant};
4
5use bytes::Bytes;
6use log::debug;
7use log::{trace, warn};
8
9use super::TimeSpan;
10use crate::loss_compression::decompress_loss_list;
11use crate::packet::{AckControlInfo, ControlTypes, HandshakeControlInfo, SrtControlPacket};
12use crate::protocol::handshake::Handshake;
13use crate::protocol::sender::buffers::*;
14use crate::protocol::Timer;
15use crate::Packet::*;
16use crate::{
17 CCData, CongestCtrl, ConnectionSettings, ControlPacket, DataPacket, Packet, SeqNumber,
18 SrtCongestCtrl,
19};
20
21mod buffers;
22
23#[derive(Debug)]
24pub enum SenderError {}
25
26pub type SenderResult = Result<(), SenderError>;
27
28#[derive(Debug, Clone, Copy)]
29pub struct SenderMetrics {
30 pub rtt: TimeSpan,
32
33 pub rtt_var: TimeSpan,
35
36 pub pkt_arr_rate: u32,
38
39 pub est_link_cap: i32,
41
42 pub lost_packets: u32,
44
45 pub retrans_packets: u32,
47
48 pub recvd_packets: u32,
50}
51
52impl SenderMetrics {
53 pub fn new() -> Self {
54 Self {
55 rtt: TimeSpan::from_micros(10_000),
56 rtt_var: TimeSpan::from_micros(0),
57 pkt_arr_rate: 0,
58 est_link_cap: 0,
59 lost_packets: 0,
60 retrans_packets: 0,
61 recvd_packets: 0,
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub enum SenderAlgorithmAction {
68 WaitUntilAck,
69 WaitForData,
70 WaitUntil(Instant),
71 Close,
72}
73
74#[derive(Debug, Clone, PartialEq)]
75pub enum SenderAlgorithmStep {
76 Step1,
77 Step6,
78}
79
80pub struct Sender {
81 settings: ConnectionSettings,
83
84 handshake: Handshake,
85
86 congestion_control: SrtCongestCtrl,
88
89 metrics: SenderMetrics,
90
91 send_buffer: SendBuffer,
93
94 output_buffer: VecDeque<Packet>,
96
97 transmit_buffer: TransmitBuffer,
99
100 loss_list: LossList,
105
106 lr_acked_packet: SeqNumber,
108
109 lr_acked_ack: i32,
111
112 step: SenderAlgorithmStep,
113
114 snd_timer: Timer,
115
116 close: bool,
117}
118
119impl Default for SenderMetrics {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl Sender {
126 pub fn new(
127 settings: ConnectionSettings,
128 handshake: Handshake,
129 congestion_control: SrtCongestCtrl,
130 ) -> Self {
131 Self {
132 settings,
133 handshake,
134 congestion_control,
135 metrics: SenderMetrics::new(),
136 send_buffer: SendBuffer::new(&settings),
137 loss_list: LossList::new(&settings),
138 lr_acked_packet: settings.init_seq_num,
139 lr_acked_ack: -1, output_buffer: VecDeque::new(),
141 transmit_buffer: TransmitBuffer::new(&settings),
142 step: SenderAlgorithmStep::Step1,
143 snd_timer: Timer::new(Duration::from_millis(1), settings.socket_start_time),
144 close: false,
145 }
146 }
147
148 pub fn settings(&self) -> &ConnectionSettings {
149 &self.settings
150 }
151
152 pub fn handle_close(&mut self) {
153 if !self.close {
154 self.close = true;
155 }
156 }
157
158 pub fn handle_data(&mut self, data: (Instant, Bytes)) {
159 self.transmit_buffer.push_message(data);
160 }
161
162 fn handle_snd_timer(&mut self, now: Instant) {
163 self.snd_timer.reset(now);
164 self.step = SenderAlgorithmStep::Step1;
165 }
166
167 pub fn handle_packet(
168 &mut self,
169 (packet, from): (Packet, SocketAddr),
170 now: Instant,
171 ) -> SenderResult {
172 if from != self.settings.remote {
174 return Ok(());
175 }
176
177 log::info!("Received packet {:?}", packet);
178
179 match packet {
180 Control(control) => self.handle_control_packet(control, now),
181 Data(data) => self.handle_data_packet(data, now),
182 }
183 }
184
185 pub fn is_flushed(&mut self) -> bool {
186 self.loss_list.is_empty()
187 && self.transmit_buffer.is_empty()
188 && self.lr_acked_packet == self.transmit_buffer.next_sequence_number
189 && self.send_buffer.is_empty()
190 && self.output_buffer.is_empty()
191 }
192
193 pub fn pop_output(&mut self) -> Option<(Packet, SocketAddr)> {
194 let to = self.settings.remote;
195 self.output_buffer
196 .pop_front()
197 .map(move |packet| (packet, to))
198 }
199
200 pub fn next_action(&mut self, now: Instant) -> SenderAlgorithmAction {
201 use SenderAlgorithmAction::*;
202 use SenderAlgorithmStep::*;
203
204 if self.close && self.is_flushed() {
206 debug!("{:?} sending shutdown", self.settings.local_sockid);
207 self.send_control(ControlTypes::Shutdown, now); return Close;
209 }
210
211 if let Some(exp_time) = self.snd_timer.check_expired(now) {
212 self.handle_snd_timer(exp_time);
213 }
214
215 if self.step == Step6 {
216 return WaitUntil(self.snd_timer.next_instant());
217 }
218
219 if let Some(p) = self.loss_list.pop_front() {
222 debug!("Sending packet in loss list, seq={:?}", p.seq_number);
223 self.send_data(p);
224
225 return WaitForData;
230 }
231 else if self.transmit_buffer.is_empty() {
241 return WaitForData;
245 }
246 else if self.lr_acked_packet
253 < self.transmit_buffer.next_sequence_number - self.congestion_control.window_size()
254 {
255 trace!("Flow window exceeded lr_acked={:?}, next_seq={:?}, window_size={}, next_seq-window={:?}",
257 self.lr_acked_packet,
258 self.transmit_buffer.next_sequence_number,
259 self.congestion_control.window_size(),
260 self.transmit_buffer.next_sequence_number - self.congestion_control.window_size());
261
262 return WaitUntilAck;
263 } else if let Some(p) = self.pop_transmit_buffer() {
264 self.send_data(p);
265 }
266
267 if let Some(p) = self.pop_transmit_buffer_16n() {
270 self.send_data(p);
273 }
274
275 self.step = Step6;
279 let period = self.congestion_control.send_interval();
280 self.snd_timer.set_period(period);
281 WaitUntil(self.snd_timer.next_instant())
282 }
283
284 fn handle_data_packet(&mut self, _packet: DataPacket, _now: Instant) -> SenderResult {
285 Ok(())
286 }
287
288 fn handle_control_packet(&mut self, packet: ControlPacket, now: Instant) -> SenderResult {
289 match packet.control_type {
290 ControlTypes::Ack(info) => self.handle_ack_packet(now, &info),
291 ControlTypes::Ack2(_) => {
292 warn!("Sender received ACK2, unusual");
293 Ok(())
294 }
295 ControlTypes::DropRequest { .. } => unimplemented!(),
296 ControlTypes::Handshake(shake) => self.handle_handshake_packet(shake, now),
297 ControlTypes::Nak(nack) => self.handle_nack_packet(nack),
308 ControlTypes::Shutdown => self.handle_shutdown_packet(),
309 ControlTypes::Srt(srt_packet) => self.handle_srt_control_packet(srt_packet),
310 ControlTypes::KeepAlive => Ok(()),
314 }
315 }
316
317 #[allow(clippy::too_many_arguments)]
318 fn handle_ack_packet(&mut self, now: Instant, info: &AckControlInfo) -> SenderResult {
319 if info.ack_number <= self.lr_acked_packet {
323 return Ok(());
324 }
325
326 if info.ack_seq_num <= self.lr_acked_ack {
327 return Ok(());
329 }
330 self.lr_acked_ack = info.ack_seq_num;
331
332 self.metrics.recvd_packets += info.ack_number - self.lr_acked_packet;
334
335 self.lr_acked_packet = info.ack_number;
337
338 self.send_control(ControlTypes::Ack2(info.ack_seq_num), now);
340
341 self.metrics.rtt = info.rtt.unwrap_or_else(|| TimeSpan::from_micros(0));
343 self.metrics.rtt_var = info
344 .rtt_variance
345 .unwrap_or_else(|| TimeSpan::from_micros(0));
346
347 {
352 let cc_info = self.make_cc_info();
353 self.congestion_control.on_ack(&cc_info);
354 }
355
356 self.metrics.pkt_arr_rate =
362 self.metrics.pkt_arr_rate / 8 * 7 + info.packet_recv_rate.unwrap_or(0) / 8;
363
364 self.metrics.est_link_cap =
367 (self.metrics.est_link_cap * 7 + info.est_link_cap.unwrap_or(0)) / 8;
368
369 self.send_buffer
372 .release_acknowledged_packets(info.ack_number);
373
374 self.metrics.retrans_packets += self.loss_list.remove_acknowledged_packets(info.ack_number);
377
378 Ok(())
379 }
380
381 fn handle_shutdown_packet(&mut self) -> SenderResult {
382 self.close = true;
383 Ok(())
384 }
385
386 fn handle_nack_packet(&mut self, nack: Vec<u32>) -> SenderResult {
387 for lost in self
392 .send_buffer
393 .get(decompress_loss_list(nack.iter().cloned()))
394 {
395 let packet = match lost {
396 Ok(p) => p,
397 Err(n) => {
398 debug!("NAK received for packet {} that's not in the buffer, maybe it's already been ACKed", n);
399 return Ok(());
400 }
401 };
402
403 self.loss_list.push_back(packet.clone());
404 }
405
406 if let Some(last_packet) = self.loss_list.back() {
408 let cc_info = self.make_cc_info();
409 self.congestion_control
410 .on_nak(last_packet.seq_number, &cc_info);
411 }
412
413 Ok(())
415 }
416
417 fn handle_handshake_packet(
418 &mut self,
419 handshake: HandshakeControlInfo,
420 now: Instant,
421 ) -> SenderResult {
422 if let Some(control_type) = self.handshake.handle_handshake(handshake) {
423 self.send_control(control_type, now);
424 }
425 Ok(())
426 }
427
428 fn handle_srt_control_packet(&mut self, packet: SrtControlPacket) -> SenderResult {
429 use self::SrtControlPacket::*;
430
431 match packet {
432 HandshakeRequest(_) | HandshakeResponse(_) => {
433 warn!("Received handshake request or response for an already setup SRT connection")
434 }
435 _ => unimplemented!(),
436 }
437
438 Ok(())
439 }
440
441 fn make_cc_info(&self) -> CCData {
442 CCData {
443 est_bandwidth: self.metrics.est_link_cap,
444 max_segment_size: self.settings.max_packet_size,
445 latest_seq_num: Some(self.transmit_buffer.latest_seqence_number()),
446 packet_arr_rate: self.metrics.pkt_arr_rate,
447 rtt: Duration::from_micros(self.metrics.rtt.as_micros() as u64),
448 }
449 }
450
451 fn pop_transmit_buffer(&mut self) -> Option<DataPacket> {
452 let packet = self.transmit_buffer.pop_front()?;
453 self.congestion_control.on_packet_sent(&self.make_cc_info());
454 self.send_buffer.push_back(packet.clone());
455 Some(packet)
456 }
457
458 fn pop_transmit_buffer_16n(&mut self) -> Option<DataPacket> {
459 match self.transmit_buffer.front().map(|p| p.seq_number % 16) {
460 Some(0) => self.pop_transmit_buffer(),
461 _ => None,
462 }
463 }
464
465 fn send_control(&mut self, control: ControlTypes, now: Instant) {
466 self.output_buffer.push_back(Packet::Control(ControlPacket {
467 timestamp: self.transmit_buffer.timestamp_from(now),
468 dest_sockid: self.settings.remote_sockid,
469 control_type: control,
470 }));
471 }
472
473 fn send_data(&mut self, p: DataPacket) {
474 self.output_buffer.push_back(Packet::Data(p));
475 }
476}