srt_protocol/protocol/sender/
mod.rs1mod buffer;
2mod congestion_control;
3mod encapsulate;
4
5use std::{
6 convert::TryFrom,
7 time::{Duration, Instant},
8};
9
10use bytes::Bytes;
11
12use crate::{
13 connection::{ConnectionSettings, ConnectionStatus},
14 options::*,
15 packet::*,
16 protocol::{
17 encryption::Encryption,
18 output::Output,
19 time::{TimeBase, Timers},
20 },
21 statistics::SocketStatistics,
22};
23
24use buffer::{AckAction, Loss, SendBuffer, SenderAction};
25use congestion_control::SenderCongestionControl;
26use encapsulate::Encapsulation;
27
28#[derive(Debug)]
29pub struct Sender {
30 time_base: TimeBase,
31 encapsulation: Encapsulation,
32 encryption: Encryption,
33 send_buffer: SendBuffer,
34 congestion_control: SenderCongestionControl,
35}
36
37impl Sender {
38 pub fn new(settings: ConnectionSettings) -> Self {
39 Self {
40 time_base: TimeBase::new(settings.socket_start_time),
41 encapsulation: Encapsulation::new(&settings),
42 encryption: Encryption::new(settings.cipher.clone()),
43 send_buffer: SendBuffer::new(&settings),
44 congestion_control: SenderCongestionControl::new(settings.bandwidth.clone()),
45 }
46 }
47
48 pub fn is_flushed(&self) -> bool {
49 self.send_buffer.is_flushed()
50 }
51
52 pub fn has_packets_to_send(&self) -> bool {
53 self.send_buffer.has_packets_to_send()
54 }
55
56 pub fn tx_buffered_time(&self) -> Duration {
57 self.send_buffer.duration()
58 }
59
60 pub fn tx_buffered_packets(&self) -> u64 {
61 u64::try_from(self.send_buffer.len()).unwrap()
62 }
63
64 pub fn tx_buffered_bytes(&self) -> u64 {
65 u64::try_from(self.send_buffer.len_bytes()).unwrap()
66 }
67}
68
69pub struct SenderContext<'a> {
70 status: &'a mut ConnectionStatus,
71 timers: &'a mut Timers,
72 output: &'a mut Output,
73 stats: &'a mut SocketStatistics,
74 sender: &'a mut Sender,
75}
76
77impl<'a> SenderContext<'a> {
78 pub fn new(
79 status: &'a mut ConnectionStatus,
80 timers: &'a mut Timers,
81 output: &'a mut Output,
82 stats: &'a mut SocketStatistics,
83 sender: &'a mut Sender,
84 ) -> Self {
85 Self {
86 status,
87 timers,
88 output,
89 stats,
90 sender,
91 }
92 }
93
94 pub fn handle_data(&mut self, now: Instant, item: (Instant, Bytes)) {
95 let (time, data) = item;
96 let (mut packets, mut bytes) = (0, 0);
97 let ts = self.sender.time_base.timestamp_from(time);
98 for packet in self.sender.encapsulation.encapsulate(ts, data) {
99 if let Some((bytes_enc, packet, km)) = self.sender.encryption.encrypt(packet) {
100 packets += 1;
101 bytes += packet.payload.len() as u64;
102 if bytes_enc > 0 {
103 self.stats.tx_encrypted_data += 1;
104 }
105
106 if let Err((p_count, b_count)) = self.sender.send_buffer.push_data(packet) {
107 self.stats.tx_dropped_data += p_count.0;
108 self.stats.tx_dropped_bytes += b_count.0;
109 }
110
111 let control = km.map(ControlTypes::new_key_refresh_request);
112 if let Some(control) = control {
113 self.output.send_control(now, control);
114 }
115 }
116 }
117
118 let snd_period =
119 self.sender
120 .congestion_control
121 .on_input(now, PacketCount(packets), ByteCount(bytes));
122 if let Some(snd_period) = snd_period {
123 self.timers.update_snd_period(snd_period)
124 }
125 }
126
127 pub fn handle_ack_packet(&mut self, now: Instant, ack: Acknowledgement) {
128 self.stats.rx_ack += 1;
129 if matches!(ack, Acknowledgement::Lite(_)) {
130 self.stats.rx_light_ack += 1;
131 }
132
133 match self.sender.send_buffer.update_largest_acked_seq_number(
134 ack.ack_number(),
135 ack.full_ack_seq_number(),
136 ack.rtt(),
137 ) {
138 Ok(AckAction {
139 received: _,
140 recovered: _,
141 send_ack2,
142 }) => {
143 if let Some(full_ack) = send_ack2 {
145 self.output.send_control(now, ControlTypes::Ack2(full_ack))
146 }
147 }
148 Err(_error) => {
149 }
153 }
154 }
155
156 pub fn handle_nak_packet(&mut self, now: Instant, nak: CompressedLossList) {
157 self.stats.rx_nak += 1;
158 for (loss, range) in self.sender.send_buffer.add_to_loss_list(nak) {
160 use Loss::*;
163 match loss {
164 Ignored | Added => {
165 self.stats.tx_loss_data += 1;
166 }
167 Dropped => {
168 self.stats.tx_dropped_data += 1;
169
170 self.output.send_control(
182 now,
183 ControlTypes::new_drop_request(MsgNumber::new_truncate(0), range),
184 )
185 }
186 }
187 }
188 }
189
190 pub fn handle_key_refresh_response(&mut self, keying_material: KeyingMaterialMessage) {
191 match self
192 .sender
193 .encryption
194 .handle_key_refresh_response(keying_material)
195 {
196 Ok(()) => {
197 }
199 Err(_err) => {
200 }
202 }
203 }
204
205 pub fn on_snd_event(&mut self, now: Instant, elapsed_periods: u32) {
206 use SenderAction::*;
207 let ts_now = self.sender.time_base.timestamp_from(now);
208 let actions = self.sender.send_buffer.next_snd_actions(
209 ts_now,
210 elapsed_periods,
211 self.status.should_drain_send_buffer(),
212 );
213 for action in actions {
214 match action {
215 Send(d) => {
216 self.stats.tx_unique_data += 1;
217 self.output.send_data(now, d);
218 }
219 RetransmitNak(d) => {
220 self.stats.tx_retransmit_data += 1;
221 self.output.send_data(now, d);
222 }
223 RetransmitRto(d) => {
224 self.stats.tx_retransmit_data += 1;
225 self.output.send_data(now, d);
226 }
227 Drop(_) => {}
228 WaitForInput => {
229 break;
230 }
231 WaitForAck { .. } => {
232 break;
233 }
234 }
235 }
236 }
237}