rtc_sctp/queue/
reassembly_queue.rs

1use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
2use crate::util::*;
3use crate::StreamId;
4use shared::error::{Error, Result};
5
6use bytes::{Bytes, BytesMut};
7use std::cmp::Ordering;
8use std::time::Instant;
9
10fn sort_chunks_by_tsn(c: &mut [ChunkPayloadData]) {
11    c.sort_by(|a, b| {
12        if sna32lt(a.tsn, b.tsn) {
13            Ordering::Less
14        } else {
15            Ordering::Greater
16        }
17    });
18}
19
20fn sort_chunks_by_ssn(c: &mut [Chunks]) {
21    c.sort_by(|a, b| {
22        if sna16lt(a.ssn, b.ssn) {
23            Ordering::Less
24        } else {
25            Ordering::Greater
26        }
27    });
28}
29
30/// A chunk of data from the stream
31#[derive(Debug, PartialEq)]
32pub struct Chunk {
33    /// The contents of the chunk
34    pub bytes: Bytes,
35}
36
37/// Chunks is a set of chunks that share the same SSN
38#[derive(Debug, Clone)]
39pub struct Chunks {
40    /// used only with the ordered chunks
41    pub ssn: u16,
42    pub ppi: PayloadProtocolIdentifier,
43    pub chunks: Vec<ChunkPayloadData>,
44    offset: usize,
45    index: usize,
46    timestamp: Instant,
47}
48
49impl Chunks {
50    pub fn is_empty(&self) -> bool {
51        self.len() == 0
52    }
53
54    pub fn len(&self) -> usize {
55        let mut l = 0;
56        for c in &self.chunks {
57            l += c.user_data.len();
58        }
59        l
60    }
61
62    // Concat all fragments into the buffer
63    pub fn read(&self, buf: &mut [u8]) -> Result<usize> {
64        let mut n_written = 0;
65        for c in &self.chunks {
66            let to_copy = c.user_data.len();
67            let n = std::cmp::min(to_copy, buf.len() - n_written);
68            buf[n_written..n_written + n].copy_from_slice(&c.user_data[..n]);
69            n_written += n;
70            if n < to_copy {
71                return Err(Error::ErrShortBuffer);
72            }
73        }
74        Ok(n_written)
75    }
76
77    pub fn next(&mut self, max_length: usize) -> Option<Chunk> {
78        if self.index >= self.chunks.len() {
79            return None;
80        }
81
82        let mut buf = BytesMut::with_capacity(max_length);
83
84        let mut n_written = 0;
85        while self.index < self.chunks.len() {
86            let to_copy = self.chunks[self.index].user_data[self.offset..].len();
87            let n = std::cmp::min(to_copy, max_length - n_written);
88            buf.extend_from_slice(&self.chunks[self.index].user_data[self.offset..self.offset + n]);
89            n_written += n;
90            if n < to_copy {
91                self.offset += n;
92                return Some(Chunk {
93                    bytes: buf.freeze(),
94                });
95            }
96            self.index += 1;
97            self.offset = 0;
98        }
99
100        Some(Chunk {
101            bytes: buf.freeze(),
102        })
103    }
104
105    pub(crate) fn new(
106        ssn: u16,
107        ppi: PayloadProtocolIdentifier,
108        chunks: Vec<ChunkPayloadData>,
109    ) -> Self {
110        Chunks {
111            ssn,
112            ppi,
113            chunks,
114            offset: 0,
115            index: 0,
116            timestamp: Instant::now(),
117        }
118    }
119
120    pub(crate) fn push(&mut self, chunk: ChunkPayloadData) -> bool {
121        // check if dup
122        for c in &self.chunks {
123            if c.tsn == chunk.tsn {
124                return false;
125            }
126        }
127
128        // append and sort
129        self.chunks.push(chunk);
130        sort_chunks_by_tsn(&mut self.chunks);
131
132        // Check if we now have a complete set
133        self.is_complete()
134    }
135
136    pub(crate) fn is_complete(&self) -> bool {
137        // Condition for complete set
138        //   0. Has at least one chunk.
139        //   1. Begins with beginningFragment set to true
140        //   2. Ends with endingFragment set to true
141        //   3. TSN monotinically increase by 1 from beginning to end
142
143        // 0.
144        let n_chunks = self.chunks.len();
145        if n_chunks == 0 {
146            return false;
147        }
148
149        // 1.
150        if !self.chunks[0].beginning_fragment {
151            return false;
152        }
153
154        // 2.
155        if !self.chunks[n_chunks - 1].ending_fragment {
156            return false;
157        }
158
159        // 3.
160        let mut last_tsn = 0u32;
161        for (i, c) in self.chunks.iter().enumerate() {
162            if i > 0 {
163                // Fragments must have contiguous TSN
164                // From RFC 4960 Section 3.3.1:
165                //   When a user message is fragmented into multiple chunks, the TSNs are
166                //   used by the receiver to reassemble the message.  This means that the
167                //   TSNs for each fragment of a fragmented user message MUST be strictly
168                //   sequential.
169                if c.tsn != last_tsn + 1 {
170                    // mid or end fragment is missing
171                    return false;
172                }
173            }
174
175            last_tsn = c.tsn;
176        }
177
178        true
179    }
180}
181
182#[derive(Default, Debug)]
183pub(crate) struct ReassemblyQueue {
184    pub(crate) si: StreamId,
185    pub(crate) next_ssn: u16,
186    /// expected SSN for next ordered chunk
187    pub(crate) ordered: Vec<Chunks>,
188    pub(crate) unordered: Vec<Chunks>,
189    pub(crate) unordered_chunks: Vec<ChunkPayloadData>,
190    pub(crate) n_bytes: usize,
191}
192
193impl ReassemblyQueue {
194    /// From RFC 4960 Sec 6.5:
195    ///   The Stream Sequence Number in all the streams MUST start from 0 when
196    ///   the association is Established.  Also, when the Stream Sequence
197    ///   Number reaches the value 65535 the next Stream Sequence Number MUST
198    ///   be set to 0.
199    pub(crate) fn new(si: StreamId) -> Self {
200        ReassemblyQueue {
201            si,
202            next_ssn: 0, // From RFC 4960 Sec 6.5:
203            ordered: vec![],
204            unordered: vec![],
205            unordered_chunks: vec![],
206            n_bytes: 0,
207        }
208    }
209
210    pub(crate) fn push(&mut self, chunk: ChunkPayloadData) -> bool {
211        if chunk.stream_identifier != self.si {
212            return false;
213        }
214
215        if chunk.unordered {
216            // First, insert into unordered_chunks array
217            //atomic.AddUint64(&r.n_bytes, uint64(len(chunk.userData)))
218            self.n_bytes += chunk.user_data.len();
219            self.unordered_chunks.push(chunk);
220            sort_chunks_by_tsn(&mut self.unordered_chunks);
221
222            // Scan unordered_chunks that are contiguous (in TSN)
223            // If found, append the complete set to the unordered array
224            if let Some(cset) = self.find_complete_unordered_chunk_set() {
225                self.unordered.push(cset);
226                return true;
227            }
228
229            false
230        } else {
231            // This is an ordered chunk
232            if sna16lt(chunk.stream_sequence_number, self.next_ssn) {
233                return false;
234            }
235
236            self.n_bytes += chunk.user_data.len();
237
238            // Check if a chunkSet with the SSN already exists
239            for s in &mut self.ordered {
240                if s.ssn == chunk.stream_sequence_number {
241                    return s.push(chunk);
242                }
243            }
244
245            // If not found, create a new chunkSet
246            let mut cset = Chunks::new(chunk.stream_sequence_number, chunk.payload_type, vec![]);
247            let unordered = chunk.unordered;
248            let ok = cset.push(chunk);
249            self.ordered.push(cset);
250            if !unordered {
251                sort_chunks_by_ssn(&mut self.ordered);
252            }
253
254            ok
255        }
256    }
257
258    pub(crate) fn find_complete_unordered_chunk_set(&mut self) -> Option<Chunks> {
259        let mut start_idx = -1isize;
260        let mut n_chunks = 0usize;
261        let mut last_tsn = 0u32;
262        let mut found = false;
263
264        for (i, c) in self.unordered_chunks.iter().enumerate() {
265            // seek beginning
266            if c.beginning_fragment {
267                start_idx = i as isize;
268                n_chunks = 1;
269                last_tsn = c.tsn;
270
271                if c.ending_fragment {
272                    found = true;
273                    break;
274                }
275                continue;
276            }
277
278            if start_idx < 0 {
279                continue;
280            }
281
282            // Check if contiguous in TSN
283            if c.tsn != last_tsn + 1 {
284                start_idx = -1;
285                continue;
286            }
287
288            last_tsn = c.tsn;
289            n_chunks += 1;
290
291            if c.ending_fragment {
292                found = true;
293                break;
294            }
295        }
296
297        if !found {
298            return None;
299        }
300
301        // Extract the range of chunks
302        let chunks: Vec<ChunkPayloadData> = self
303            .unordered_chunks
304            .drain(start_idx as usize..(start_idx as usize) + n_chunks)
305            .collect();
306        Some(Chunks::new(0, chunks[0].payload_type, chunks))
307    }
308
309    pub(crate) fn is_readable(&self) -> bool {
310        // Check unordered first
311        if !self.unordered.is_empty() {
312            // The chunk sets in self.unordered should all be complete.
313            return true;
314        }
315
316        // Check ordered sets
317        if !self.ordered.is_empty() {
318            let cset = &self.ordered[0];
319            if cset.is_complete() && sna16lte(cset.ssn, self.next_ssn) {
320                return true;
321            }
322        }
323        false
324    }
325
326    fn readable_unordered_chunks(&self) -> Option<&Chunks> {
327        self.unordered.first()
328    }
329
330    fn readable_ordered_chunks(&self) -> Option<&Chunks> {
331        let ordered = self.ordered.first();
332        if let Some(chunks) = ordered {
333            if !chunks.is_complete() {
334                return None;
335            }
336            if sna16gt(chunks.ssn, self.next_ssn) {
337                return None;
338            }
339            Some(chunks)
340        } else {
341            None
342        }
343    }
344
345    pub(crate) fn read(&mut self) -> Option<Chunks> {
346        let chunks = if let (Some(unordered_chunks), Some(ordered_chunks)) = (
347            self.readable_unordered_chunks(),
348            self.readable_ordered_chunks(),
349        ) {
350            if unordered_chunks.timestamp < ordered_chunks.timestamp {
351                self.unordered.remove(0)
352            } else {
353                if ordered_chunks.ssn == self.next_ssn {
354                    self.next_ssn += 1;
355                }
356                self.ordered.remove(0)
357            }
358        } else {
359            // Check unordered first
360            if !self.unordered.is_empty() {
361                self.unordered.remove(0)
362            } else if !self.ordered.is_empty() {
363                // Now, check ordered
364                let chunks = &self.ordered[0];
365                if !chunks.is_complete() {
366                    return None;
367                }
368                if sna16gt(chunks.ssn, self.next_ssn) {
369                    return None;
370                }
371                if chunks.ssn == self.next_ssn {
372                    self.next_ssn += 1;
373                }
374                self.ordered.remove(0)
375            } else {
376                return None;
377            }
378        };
379
380        self.subtract_num_bytes(chunks.len());
381
382        Some(chunks)
383    }
384
385    /// Use last_ssn to locate a chunkSet then remove it if the set has
386    /// not been complete
387    pub(crate) fn forward_tsn_for_ordered(&mut self, last_ssn: u16) {
388        let num_bytes = self
389            .ordered
390            .iter()
391            .filter(|s| sna16lte(s.ssn, last_ssn) && !s.is_complete())
392            .fold(0, |n, s| {
393                n + s.chunks.iter().fold(0, |acc, c| acc + c.user_data.len())
394            });
395        self.subtract_num_bytes(num_bytes);
396
397        self.ordered
398            .retain(|s| !sna16lte(s.ssn, last_ssn) || s.is_complete());
399
400        // Finally, forward next_ssn
401        if sna16lte(self.next_ssn, last_ssn) {
402            self.next_ssn = last_ssn + 1;
403        }
404    }
405
406    /// Remove all fragments in the unordered sets that contains chunks
407    /// equal to or older than `new_cumulative_tsn`.
408    /// We know all sets in the r.unordered are complete ones.
409    /// Just remove chunks that are equal to or older than new_cumulative_tsn
410    /// from the unordered_chunks
411    pub(crate) fn forward_tsn_for_unordered(&mut self, new_cumulative_tsn: u32) {
412        let mut last_idx: isize = -1;
413        for (i, c) in self.unordered_chunks.iter().enumerate() {
414            if sna32gt(c.tsn, new_cumulative_tsn) {
415                break;
416            }
417            last_idx = i as isize;
418        }
419        if last_idx >= 0 {
420            for i in 0..(last_idx + 1) as usize {
421                self.subtract_num_bytes(self.unordered_chunks[i].user_data.len());
422            }
423            self.unordered_chunks.drain(..(last_idx + 1) as usize);
424        }
425    }
426
427    pub(crate) fn subtract_num_bytes(&mut self, n_bytes: usize) {
428        if self.n_bytes >= n_bytes {
429            self.n_bytes -= n_bytes;
430        } else {
431            self.n_bytes = 0;
432        }
433    }
434
435    pub(crate) fn get_num_bytes(&self) -> usize {
436        self.n_bytes
437    }
438}