srt_protocol/protocol/receiver/
mod.rs

1mod arq;
2mod buffer;
3mod history;
4mod time;
5
6use std::{
7    ops::RangeInclusive,
8    time::{Duration, Instant},
9};
10
11use arq::AutomaticRepeatRequestAlgorithm;
12
13use crate::{
14    connection::ConnectionSettings,
15    packet::*,
16    protocol::{
17        encryption::{Decryption, DecryptionError},
18        output::Output,
19        time::Timers,
20    },
21    statistics::SocketStatistics,
22};
23
24#[derive(Debug, Eq, PartialEq)]
25pub enum DataPacketError {
26    // "Dropping packet {}, receive buffer full"
27    BufferFull {
28        seq_number: SeqNumber,
29        buffer_size: usize,
30    },
31    // "Packet received too far in the future for configured receive buffer size. Discarding packet (buffer would need to be {} packets larger)"
32    PacketTooEarly {
33        seq_number: SeqNumber,
34        buffer_available: usize,
35        buffer_required: usize,
36    },
37    // "Too-late packet {} was received, discarding"
38    PacketTooLate {
39        seq_number: SeqNumber,
40        seq_number_0: SeqNumber,
41    },
42    // "Duplicate packet {}"
43    DiscardedDuplicate {
44        seq_number: SeqNumber,
45    },
46    DecryptionError(DecryptionError),
47}
48
49#[derive(Debug, Eq, PartialEq)]
50pub enum DataPacketAction {
51    Received {
52        lrsn: SeqNumber,
53        recovered: bool,
54    },
55    ReceivedWithLoss(CompressedLossList),
56    ReceivedWithLightAck {
57        light_ack: SeqNumber,
58        recovered: bool,
59    },
60}
61
62#[derive(Debug)]
63pub struct Receiver {
64    pub arq: AutomaticRepeatRequestAlgorithm,
65    pub decryption: Decryption,
66}
67
68impl Receiver {
69    pub fn new(settings: ConnectionSettings) -> Self {
70        Self {
71            arq: AutomaticRepeatRequestAlgorithm::new(
72                settings.socket_start_time,
73                settings.recv_tsbpd_latency,
74                settings.too_late_packet_drop,
75                settings.init_seq_num,
76                settings.recv_buffer_size,
77            ),
78            decryption: Decryption::new(settings.cipher),
79        }
80    }
81
82    pub fn is_flushed(&self) -> bool {
83        self.arq.is_flushed()
84    }
85
86    pub fn rx_acknowledged_time(&self) -> Duration {
87        self.arq.rx_acknowledged_time()
88    }
89}
90
91pub struct ReceiverContext<'a> {
92    timers: &'a mut Timers,
93    output: &'a mut Output,
94    stats: &'a mut SocketStatistics,
95    receiver: &'a mut Receiver,
96}
97
98impl<'a> ReceiverContext<'a> {
99    pub fn new(
100        timers: &'a mut Timers,
101        output: &'a mut Output,
102        stats: &'a mut SocketStatistics,
103        receiver: &'a mut Receiver,
104    ) -> Self {
105        Self {
106            timers,
107            stats,
108            output,
109            receiver,
110        }
111    }
112
113    pub fn synchronize_clock(&mut self, now: Instant, ts: TimeStamp) {
114        if let Some(_adjustment) = self.receiver.arq.synchronize_clock(now, ts) {
115            //self.debug("clock sync", now, &adjustment);
116            self.stats.rx_clock_adjustments += 1;
117        }
118    }
119
120    pub fn handle_data_packet(&mut self, now: Instant, data: DataPacket) {
121        use Acknowledgement::*;
122        use ControlTypes::*;
123        let bytes = data.wire_size() as u64;
124        self.stats.rx_data += 1;
125        self.stats.rx_bytes += bytes;
126
127        let data = self
128            .receiver
129            .decryption
130            .decrypt(data)
131            .map_err(DataPacketError::DecryptionError)
132            .and_then(|(decrypted_bytes, data)| {
133                if decrypted_bytes > 0 {
134                    self.stats.rx_decrypted_data += 1;
135                }
136                self.receiver.arq.handle_data_packet(now, data)
137            });
138
139        match data {
140            Ok(action) => {
141                if action.is_recovered() {
142                    self.stats.rx_retransmit_data += 1;
143                } else {
144                    self.stats.rx_unique_data += 1;
145                    self.stats.rx_unique_bytes += bytes;
146                }
147
148                use DataPacketAction::*;
149                match action {
150                    ReceivedWithLoss(loss_list) => {
151                        self.output.send_control(now, Nak(loss_list));
152                    }
153                    ReceivedWithLightAck { light_ack, .. } => {
154                        self.output.send_control(now, Ack(Lite(light_ack)));
155                    }
156                    _ => {}
157                }
158            }
159            Err(e) => {
160                use DataPacketError::*;
161                match e {
162                    BufferFull { .. } | PacketTooEarly { .. } | PacketTooLate { .. } => {
163                        self.stats.rx_dropped_data += 1;
164                        self.stats.rx_dropped_bytes += bytes;
165                    }
166                    DecryptionError(_) => {
167                        self.stats.rx_decrypt_errors += 1;
168                        self.stats.rx_decrypt_error_bytes += bytes;
169                    }
170                    DiscardedDuplicate { .. } => {}
171                }
172            }
173        }
174    }
175
176    pub fn handle_ack2_packet(&mut self, now: Instant, seq_num: FullAckSeqNumber) {
177        self.stats.rx_ack2 += 1;
178        let rtt = self.receiver.arq.handle_ack2_packet(now, seq_num);
179        if let Some(rtt) = rtt {
180            self.timers.update_rtt(rtt);
181            //self.warn("ack not found", now, &seq_num);
182            self.stats.rx_ack2_errors += 1;
183        }
184    }
185
186    pub fn handle_drop_request(&mut self, now: Instant, drop: RangeInclusive<SeqNumber>) {
187        let range = *drop.start()..*drop.end() + 1;
188        let dropped = self.receiver.arq.handle_drop_request(now, range) as u64;
189        if dropped > 0 {
190            //self.warn("packets dropped", now, &(dropped, drop));
191            self.stats.rx_dropped_data += dropped;
192        }
193    }
194
195    pub fn handle_key_refresh_request(
196        &mut self,
197        now: Instant,
198        keying_material: KeyingMaterialMessage,
199    ) {
200        match self
201            .receiver
202            .decryption
203            .refresh_key_material(keying_material)
204        {
205            Ok(Some(response)) => {
206                // TODO: add statistic or "event" notification?
207                // key rotation
208                self.output.send_control(
209                    now,
210                    ControlTypes::Srt(SrtControlPacket::KeyRefreshResponse(response)),
211                )
212            }
213            Ok(None) => {
214                //self.debug("key refresh request", &"duplicate key"),
215            }
216            Err(_err) => {
217                //self.warn("key refresh", &err),
218            }
219        }
220    }
221
222    pub fn on_full_ack_event(&mut self, now: Instant) {
223        if let Some(ack) = self.receiver.arq.on_full_ack_event(now) {
224            // Pack the ACK packet with RTT, RTT Variance, and flow window size (available
225            // receiver buffer size).
226            self.output.send_control(now, ControlTypes::Ack(ack));
227        }
228    }
229
230    pub fn on_nak_event(&mut self, now: Instant) {
231        if let Some(loss_list) = self.receiver.arq.on_nak_event(now) {
232            self.output.send_control(now, ControlTypes::Nak(loss_list));
233        }
234    }
235
236    pub fn on_close_timeout(&mut self, _now: Instant) {
237        //self.debug("timed out", now, &self.receiver.arq);
238        self.receiver.arq.clear()
239    }
240}
241
242impl DataPacketAction {
243    pub fn is_recovered(&self) -> bool {
244        use DataPacketAction::*;
245        match self {
246            Received { recovered, .. } | ReceivedWithLightAck { recovered, .. } => *recovered,
247            ReceivedWithLoss(_) => false,
248        }
249    }
250}