rtc_sctp/queue/
reassembly_queue.rs1use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
2use crate::util::*;
3use crate::StreamId;
4use shared::error::{Error, Result};
5
6use bytes::{Bytes, BytesMut};
7use std::cmp::Ordering;
8use std::time::Instant;
9
10fn sort_chunks_by_tsn(c: &mut [ChunkPayloadData]) {
11 c.sort_by(|a, b| {
12 if sna32lt(a.tsn, b.tsn) {
13 Ordering::Less
14 } else {
15 Ordering::Greater
16 }
17 });
18}
19
20fn sort_chunks_by_ssn(c: &mut [Chunks]) {
21 c.sort_by(|a, b| {
22 if sna16lt(a.ssn, b.ssn) {
23 Ordering::Less
24 } else {
25 Ordering::Greater
26 }
27 });
28}
29
30#[derive(Debug, PartialEq)]
32pub struct Chunk {
33 pub bytes: Bytes,
35}
36
37#[derive(Debug, Clone)]
39pub struct Chunks {
40 pub ssn: u16,
42 pub ppi: PayloadProtocolIdentifier,
43 pub chunks: Vec<ChunkPayloadData>,
44 offset: usize,
45 index: usize,
46 timestamp: Instant,
47}
48
49impl Chunks {
50 pub fn is_empty(&self) -> bool {
51 self.len() == 0
52 }
53
54 pub fn len(&self) -> usize {
55 let mut l = 0;
56 for c in &self.chunks {
57 l += c.user_data.len();
58 }
59 l
60 }
61
62 pub fn read(&self, buf: &mut [u8]) -> Result<usize> {
64 let mut n_written = 0;
65 for c in &self.chunks {
66 let to_copy = c.user_data.len();
67 let n = std::cmp::min(to_copy, buf.len() - n_written);
68 buf[n_written..n_written + n].copy_from_slice(&c.user_data[..n]);
69 n_written += n;
70 if n < to_copy {
71 return Err(Error::ErrShortBuffer);
72 }
73 }
74 Ok(n_written)
75 }
76
77 pub fn next(&mut self, max_length: usize) -> Option<Chunk> {
78 if self.index >= self.chunks.len() {
79 return None;
80 }
81
82 let mut buf = BytesMut::with_capacity(max_length);
83
84 let mut n_written = 0;
85 while self.index < self.chunks.len() {
86 let to_copy = self.chunks[self.index].user_data[self.offset..].len();
87 let n = std::cmp::min(to_copy, max_length - n_written);
88 buf.extend_from_slice(&self.chunks[self.index].user_data[self.offset..self.offset + n]);
89 n_written += n;
90 if n < to_copy {
91 self.offset += n;
92 return Some(Chunk {
93 bytes: buf.freeze(),
94 });
95 }
96 self.index += 1;
97 self.offset = 0;
98 }
99
100 Some(Chunk {
101 bytes: buf.freeze(),
102 })
103 }
104
105 pub(crate) fn new(
106 ssn: u16,
107 ppi: PayloadProtocolIdentifier,
108 chunks: Vec<ChunkPayloadData>,
109 ) -> Self {
110 Chunks {
111 ssn,
112 ppi,
113 chunks,
114 offset: 0,
115 index: 0,
116 timestamp: Instant::now(),
117 }
118 }
119
120 pub(crate) fn push(&mut self, chunk: ChunkPayloadData) -> bool {
121 for c in &self.chunks {
123 if c.tsn == chunk.tsn {
124 return false;
125 }
126 }
127
128 self.chunks.push(chunk);
130 sort_chunks_by_tsn(&mut self.chunks);
131
132 self.is_complete()
134 }
135
136 pub(crate) fn is_complete(&self) -> bool {
137 let n_chunks = self.chunks.len();
145 if n_chunks == 0 {
146 return false;
147 }
148
149 if !self.chunks[0].beginning_fragment {
151 return false;
152 }
153
154 if !self.chunks[n_chunks - 1].ending_fragment {
156 return false;
157 }
158
159 let mut last_tsn = 0u32;
161 for (i, c) in self.chunks.iter().enumerate() {
162 if i > 0 {
163 if c.tsn != last_tsn + 1 {
170 return false;
172 }
173 }
174
175 last_tsn = c.tsn;
176 }
177
178 true
179 }
180}
181
182#[derive(Default, Debug)]
183pub(crate) struct ReassemblyQueue {
184 pub(crate) si: StreamId,
185 pub(crate) next_ssn: u16,
186 pub(crate) ordered: Vec<Chunks>,
188 pub(crate) unordered: Vec<Chunks>,
189 pub(crate) unordered_chunks: Vec<ChunkPayloadData>,
190 pub(crate) n_bytes: usize,
191}
192
193impl ReassemblyQueue {
194 pub(crate) fn new(si: StreamId) -> Self {
200 ReassemblyQueue {
201 si,
202 next_ssn: 0, ordered: vec![],
204 unordered: vec![],
205 unordered_chunks: vec![],
206 n_bytes: 0,
207 }
208 }
209
210 pub(crate) fn push(&mut self, chunk: ChunkPayloadData) -> bool {
211 if chunk.stream_identifier != self.si {
212 return false;
213 }
214
215 if chunk.unordered {
216 self.n_bytes += chunk.user_data.len();
219 self.unordered_chunks.push(chunk);
220 sort_chunks_by_tsn(&mut self.unordered_chunks);
221
222 if let Some(cset) = self.find_complete_unordered_chunk_set() {
225 self.unordered.push(cset);
226 return true;
227 }
228
229 false
230 } else {
231 if sna16lt(chunk.stream_sequence_number, self.next_ssn) {
233 return false;
234 }
235
236 self.n_bytes += chunk.user_data.len();
237
238 for s in &mut self.ordered {
240 if s.ssn == chunk.stream_sequence_number {
241 return s.push(chunk);
242 }
243 }
244
245 let mut cset = Chunks::new(chunk.stream_sequence_number, chunk.payload_type, vec![]);
247 let unordered = chunk.unordered;
248 let ok = cset.push(chunk);
249 self.ordered.push(cset);
250 if !unordered {
251 sort_chunks_by_ssn(&mut self.ordered);
252 }
253
254 ok
255 }
256 }
257
258 pub(crate) fn find_complete_unordered_chunk_set(&mut self) -> Option<Chunks> {
259 let mut start_idx = -1isize;
260 let mut n_chunks = 0usize;
261 let mut last_tsn = 0u32;
262 let mut found = false;
263
264 for (i, c) in self.unordered_chunks.iter().enumerate() {
265 if c.beginning_fragment {
267 start_idx = i as isize;
268 n_chunks = 1;
269 last_tsn = c.tsn;
270
271 if c.ending_fragment {
272 found = true;
273 break;
274 }
275 continue;
276 }
277
278 if start_idx < 0 {
279 continue;
280 }
281
282 if c.tsn != last_tsn + 1 {
284 start_idx = -1;
285 continue;
286 }
287
288 last_tsn = c.tsn;
289 n_chunks += 1;
290
291 if c.ending_fragment {
292 found = true;
293 break;
294 }
295 }
296
297 if !found {
298 return None;
299 }
300
301 let chunks: Vec<ChunkPayloadData> = self
303 .unordered_chunks
304 .drain(start_idx as usize..(start_idx as usize) + n_chunks)
305 .collect();
306 Some(Chunks::new(0, chunks[0].payload_type, chunks))
307 }
308
309 pub(crate) fn is_readable(&self) -> bool {
310 if !self.unordered.is_empty() {
312 return true;
314 }
315
316 if !self.ordered.is_empty() {
318 let cset = &self.ordered[0];
319 if cset.is_complete() && sna16lte(cset.ssn, self.next_ssn) {
320 return true;
321 }
322 }
323 false
324 }
325
326 fn readable_unordered_chunks(&self) -> Option<&Chunks> {
327 self.unordered.first()
328 }
329
330 fn readable_ordered_chunks(&self) -> Option<&Chunks> {
331 let ordered = self.ordered.first();
332 if let Some(chunks) = ordered {
333 if !chunks.is_complete() {
334 return None;
335 }
336 if sna16gt(chunks.ssn, self.next_ssn) {
337 return None;
338 }
339 Some(chunks)
340 } else {
341 None
342 }
343 }
344
345 pub(crate) fn read(&mut self) -> Option<Chunks> {
346 let chunks = if let (Some(unordered_chunks), Some(ordered_chunks)) = (
347 self.readable_unordered_chunks(),
348 self.readable_ordered_chunks(),
349 ) {
350 if unordered_chunks.timestamp < ordered_chunks.timestamp {
351 self.unordered.remove(0)
352 } else {
353 if ordered_chunks.ssn == self.next_ssn {
354 self.next_ssn += 1;
355 }
356 self.ordered.remove(0)
357 }
358 } else {
359 if !self.unordered.is_empty() {
361 self.unordered.remove(0)
362 } else if !self.ordered.is_empty() {
363 let chunks = &self.ordered[0];
365 if !chunks.is_complete() {
366 return None;
367 }
368 if sna16gt(chunks.ssn, self.next_ssn) {
369 return None;
370 }
371 if chunks.ssn == self.next_ssn {
372 self.next_ssn += 1;
373 }
374 self.ordered.remove(0)
375 } else {
376 return None;
377 }
378 };
379
380 self.subtract_num_bytes(chunks.len());
381
382 Some(chunks)
383 }
384
385 pub(crate) fn forward_tsn_for_ordered(&mut self, last_ssn: u16) {
388 let num_bytes = self
389 .ordered
390 .iter()
391 .filter(|s| sna16lte(s.ssn, last_ssn) && !s.is_complete())
392 .fold(0, |n, s| {
393 n + s.chunks.iter().fold(0, |acc, c| acc + c.user_data.len())
394 });
395 self.subtract_num_bytes(num_bytes);
396
397 self.ordered
398 .retain(|s| !sna16lte(s.ssn, last_ssn) || s.is_complete());
399
400 if sna16lte(self.next_ssn, last_ssn) {
402 self.next_ssn = last_ssn + 1;
403 }
404 }
405
406 pub(crate) fn forward_tsn_for_unordered(&mut self, new_cumulative_tsn: u32) {
412 let mut last_idx: isize = -1;
413 for (i, c) in self.unordered_chunks.iter().enumerate() {
414 if sna32gt(c.tsn, new_cumulative_tsn) {
415 break;
416 }
417 last_idx = i as isize;
418 }
419 if last_idx >= 0 {
420 for i in 0..(last_idx + 1) as usize {
421 self.subtract_num_bytes(self.unordered_chunks[i].user_data.len());
422 }
423 self.unordered_chunks.drain(..(last_idx + 1) as usize);
424 }
425 }
426
427 pub(crate) fn subtract_num_bytes(&mut self, n_bytes: usize) {
428 if self.n_bytes >= n_bytes {
429 self.n_bytes -= n_bytes;
430 } else {
431 self.n_bytes = 0;
432 }
433 }
434
435 pub(crate) fn get_num_bytes(&self) -> usize {
436 self.n_bytes
437 }
438}