s2n_quic_core/buffer/
reassembler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module contains data structures for buffering incoming streams.
5
6use crate::{
7    buffer::{
8        reader::storage::{Chunk, Infallible as _},
9        Error, Reader,
10    },
11    varint::VarInt,
12};
13use alloc::collections::{vec_deque, VecDeque};
14use bytes::BytesMut;
15
16mod duplex;
17mod probe;
18mod reader;
19mod request;
20mod slot;
21mod writer;
22
23#[cfg(test)]
24mod tests;
25
26use request::Request;
27use slot::Slot;
28
29/// The default buffer size for slots that the [`Reassembler`] uses.
30///
31/// This value was picked as it is typically used for the default memory page size.
32const MIN_BUFFER_ALLOCATION_SIZE: usize = 4096;
33
34/// The value used for when the final size is unknown.
35///
36/// By using `u64::MAX` we don't have to special case any of the logic. Also note that the actual
37/// max size of any stream is a `VarInt::MAX` so this isn't a valid value.
38const UNKNOWN_FINAL_SIZE: u64 = u64::MAX;
39
40//= https://www.rfc-editor.org/rfc/rfc9000#section-2.2
41//# Endpoints MUST be able to deliver stream data to an application as an
42//# ordered byte-stream.
43
44/// `Reassembler` is a buffer structure for combining chunks of bytes in an
45/// ordered stream, which might arrive out of order.
46///
47/// `Reassembler` will accumulate the bytes, and provide them to its users
48/// once a contiguous range of bytes at the current position of the stream has
49/// been accumulated.
50///
51/// `Reassembler` is optimized for minimizing memory allocations and for
52/// offering it's users chunks of sizes that minimize call overhead.
53///
54/// If data is received in smaller chunks, only the first chunk will trigger a
55/// memory allocation. All other chunks can be copied into the already allocated
56/// region.
57///
58/// When users want to consume data from the buffer, the consumable part of the
59/// internal receive buffer is split off and passed back to the caller. Due to
60/// this chunk being a view onto a reference-counted internal buffer of type
61/// [`BytesMut`] this is also efficient and does not require additional memory
62/// allocation or copy.
63///
64/// ## Usage
65///
66/// ```rust
67/// use s2n_quic_core::buffer::Reassembler;
68///
69/// let mut buffer = Reassembler::new();
70///
71/// // write a chunk of bytes at offset 4, which can not be consumed yet
72/// assert!(buffer.write_at(4u32.into(), &[4, 5, 6, 7]).is_ok());
73/// assert_eq!(0, buffer.len());
74/// assert_eq!(None, buffer.pop());
75///
76/// // write a chunk of bytes at offset 0, which allows for consumption
77/// assert!(buffer.write_at(0u32.into(), &[0, 1, 2, 3]).is_ok());
78/// assert_eq!(8, buffer.len());
79///
80/// // Pop chunks. Since they all fitted into a single internal buffer,
81/// // they will be returned in combined fashion.
82/// assert_eq!(&[0u8, 1, 2, 3, 4, 5, 6, 7], &buffer.pop().unwrap()[..]);
83/// ```
84#[derive(Debug, PartialEq, Default)]
85pub struct Reassembler {
86    slots: VecDeque<Slot>,
87    cursors: Cursors,
88}
89
90#[derive(Clone, Copy, Debug, PartialEq)]
91struct Cursors {
92    start_offset: u64,
93    max_recv_offset: u64,
94    final_offset: u64,
95}
96
97impl Cursors {
98    #[inline]
99    fn final_size(&self) -> Option<u64> {
100        if self.final_offset == UNKNOWN_FINAL_SIZE {
101            None
102        } else {
103            Some(self.final_offset)
104        }
105    }
106}
107
108impl Default for Cursors {
109    #[inline]
110    fn default() -> Self {
111        Self {
112            start_offset: 0,
113            max_recv_offset: 0,
114            final_offset: UNKNOWN_FINAL_SIZE,
115        }
116    }
117}
118
119impl Reassembler {
120    /// Creates a new `Reassembler`
121    #[inline]
122    pub fn new() -> Reassembler {
123        Self::default()
124    }
125
126    /// Returns true if the buffer has completely been written to and the final size is known
127    #[inline]
128    pub fn is_writing_complete(&self) -> bool {
129        self.final_size()
130            .is_some_and(|len| self.total_received_len() == len)
131    }
132
133    /// Returns true if the buffer has completely been read and the final size is known
134    #[inline]
135    pub fn is_reading_complete(&self) -> bool {
136        self.final_size() == Some(self.cursors.start_offset)
137    }
138
139    /// Returns the final size of the stream, if known
140    #[inline]
141    pub fn final_size(&self) -> Option<u64> {
142        self.cursors.final_size()
143    }
144
145    /// Returns the amount of bytes available for reading.
146    /// This equals the amount of data that is stored in contiguous fashion at
147    /// the start of the buffer.
148    #[inline]
149    pub fn len(&self) -> usize {
150        self.report().0
151    }
152
153    /// Returns true if no bytes are available for reading
154    #[inline]
155    pub fn is_empty(&self) -> bool {
156        if let Some(slot) = self.slots.front() {
157            !slot.is_occupied(self.cursors.start_offset)
158        } else {
159            true
160        }
161    }
162
163    /// Returns the number of bytes and chunks available for consumption
164    #[inline]
165    pub fn report(&self) -> (usize, usize) {
166        let mut bytes = 0;
167        let mut chunks = 0;
168        for chunk in self.iter() {
169            bytes += chunk.len();
170            chunks += 1;
171        }
172        (bytes, chunks)
173    }
174
175    /// Pushes a slice at a certain offset
176    #[inline]
177    pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> {
178        let mut request = Request::new(offset, data, false)?;
179        self.write_reader(&mut request)?;
180        Ok(())
181    }
182
183    /// Pushes a slice at a certain offset, which is the end of the buffer
184    #[inline]
185    pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> {
186        let mut request = Request::new(offset, data, true)?;
187        self.write_reader(&mut request)?;
188        Ok(())
189    }
190
191    #[inline]
192    pub fn write_reader<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
193    where
194        R: Reader + ?Sized,
195    {
196        // Trims off any data that has already been received
197        reader.skip_until(self.current_offset())?;
198
199        // store a snapshot of the cursors in case there's an error
200        let snapshot = self.cursors;
201
202        self.check_reader_fin(reader)?;
203
204        if let Err(err) = self.write_reader_impl(reader) {
205            use core::any::TypeId;
206            if TypeId::of::<R::Error>() != TypeId::of::<core::convert::Infallible>() {
207                self.cursors = snapshot;
208            }
209            return Err(Error::ReaderError(err));
210        }
211
212        self.invariants();
213
214        Ok(())
215    }
216
217    /// Ensures the final offset doesn't change
218    #[inline]
219    fn check_reader_fin<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
220    where
221        R: Reader + ?Sized,
222    {
223        let buffered_offset = reader
224            .current_offset()
225            .checked_add_usize(reader.buffered_len())
226            .ok_or(Error::OutOfRange)?
227            .as_u64();
228
229        //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5
230        //# Once a final size for a stream is known, it cannot change.  If a
231        //# RESET_STREAM or STREAM frame is received indicating a change in the
232        //# final size for the stream, an endpoint SHOULD respond with an error
233        //# of type FINAL_SIZE_ERROR; see Section 11 for details on error
234        //# handling.
235        match (reader.final_offset(), self.final_size()) {
236            (Some(actual), Some(expected)) => {
237                ensure!(actual == expected, Err(Error::InvalidFin));
238            }
239            (Some(final_offset), None) => {
240                let final_offset = final_offset.as_u64();
241
242                // make sure that we didn't see any previous chunks greater than the final size
243                ensure!(
244                    self.cursors.max_recv_offset <= final_offset,
245                    Err(Error::InvalidFin)
246                );
247
248                self.cursors.final_offset = final_offset;
249            }
250            (None, Some(expected)) => {
251                // make sure the reader doesn't exceed a previously known final offset
252                ensure!(expected >= buffered_offset, Err(Error::InvalidFin));
253            }
254            (None, None) => {}
255        }
256
257        // record the maximum offset that we've seen
258        self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(buffered_offset);
259
260        Ok(())
261    }
262
263    #[inline(always)]
264    fn write_reader_impl<R>(&mut self, reader: &mut R) -> Result<(), R::Error>
265    where
266        R: Reader + ?Sized,
267    {
268        // if the reader is empty at this point, just make sure it doesn't return an error
269        if reader.buffer_is_empty() {
270            let _chunk = reader.read_chunk(0)?;
271            return Ok(());
272        }
273
274        let mut selected = None;
275
276        // start from the back with the assumption that most data arrives in order
277        for idx in (0..self.slots.len()).rev() {
278            let Some(slot) = self.slots.get(idx) else {
279                unsafe {
280                    // SAFETY: `idx` should always be in bounds, since it's generated by the range
281                    // `0..slots.len()`
282                    assume!(false);
283                }
284            };
285
286            // find the first slot that we can write into
287            ensure!(slot.start() <= reader.current_offset().as_u64(), continue);
288
289            selected = Some(idx);
290            break;
291        }
292
293        let idx = if let Some(idx) = selected {
294            idx
295        } else {
296            let mut idx = 0;
297            // set the current request to the upper slot and loop
298            let mut slot = self.allocate_slot(reader);
299
300            // before pushing the slot, make sure the reader doesn't fail
301            let filled = slot.try_write_reader(reader, &mut true)?;
302
303            if let Some(slot) = filled {
304                self.slots.push_front(slot);
305                idx += 1;
306            }
307            self.slots.push_front(slot);
308
309            ensure!(!reader.buffer_is_empty(), Ok(()));
310
311            idx
312        };
313
314        self.write_reader_at(reader, idx)?;
315        Ok(())
316    }
317
318    #[inline(always)]
319    fn write_reader_at<R>(&mut self, reader: &mut R, mut idx: usize) -> Result<(), R::Error>
320    where
321        R: Reader + ?Sized,
322    {
323        let initial_idx = idx;
324        let mut filled_slot = false;
325
326        unsafe {
327            assume!(
328                !reader.buffer_is_empty(),
329                "the first write should always be non-empty"
330            );
331        }
332
333        while !reader.buffer_is_empty() {
334            let Some(slot) = self.slots.get_mut(idx) else {
335                unsafe {
336                    // SAFETY: `idx` should always be in bounds, since it's provided by a range
337                    // that was bound to `slots.len()`
338                    assume!(false);
339                }
340            };
341
342            // try filling the slot with the reader
343            let filled = slot.try_write_reader(reader, &mut filled_slot)?;
344
345            idx += 1;
346            if let Some(slot) = filled {
347                self.insert(idx, slot);
348                idx += 1;
349            }
350
351            // if the reader is empty then we're done copying
352            ensure!(!reader.buffer_is_empty(), break);
353
354            // we need to start allocating new slots
355            self.write_reader_with_alloc(reader, &mut idx, &mut filled_slot)?;
356
357            continue;
358        }
359
360        // only try unsplitting if we filled at least one spot
361        if filled_slot {
362            self.unsplit_range(initial_idx..idx);
363        }
364
365        Ok(())
366    }
367
368    #[inline(always)]
369    fn write_reader_with_alloc<R>(
370        &mut self,
371        reader: &mut R,
372        idx: &mut usize,
373        filled_slot: &mut bool,
374    ) -> Result<(), R::Error>
375    where
376        R: Reader + ?Sized,
377    {
378        while !reader.buffer_is_empty() {
379            if let Some(next) = self.slots.get(*idx) {
380                // the next slot is able to handle the reader so yield
381                ensure!(next.start() > reader.current_offset().as_u64(), break);
382            }
383
384            // allocate a new slot for the reader
385            let mut slot = self.allocate_slot(reader);
386
387            // try filling the slot with the reader
388            let filled = slot.try_write_reader(reader, filled_slot)?;
389
390            // insert the newly allocated slot if the reader succeeded
391            self.insert(*idx, slot);
392
393            *idx += 1;
394            if let Some(slot) = filled {
395                self.insert(*idx, slot);
396                *idx += 1;
397            }
398        }
399
400        Ok(())
401    }
402
403    #[inline]
404    fn unsplit_range(&mut self, range: core::ops::Range<usize>) {
405        // try to merge all of the slots that were modified
406        for idx in range.rev() {
407            let Some(slot) = self.slots.get(idx) else {
408                unsafe {
409                    // SAFETY: `idx` should always be in bounds, since it's provided by a range
410                    // that was bound to `slots.len()`
411                    assume!(false);
412                }
413            };
414
415            // if this slot was completed, we should try and unsplit with the next slot
416            ensure!(slot.is_full(), continue);
417
418            let start = slot.start();
419            let end = slot.end();
420
421            let Some(next) = self.slots.get(idx + 1) else {
422                continue;
423            };
424
425            ensure!(next.start() == end, continue);
426
427            let current_block = Self::align_offset(start, Self::allocation_size(start));
428            let next_block = Self::align_offset(next.start(), Self::allocation_size(next.start()));
429            ensure!(current_block == next_block, continue);
430
431            if let Some(next) = self.slots.remove(idx + 1) {
432                self.slots[idx].unsplit(next);
433            } else {
434                unsafe {
435                    // SAFETY: `idx` should always be in bounds, since it's provided by a range
436                    // that was bound to `slots.len()`
437                    assume!(false, "idx + 1 was checked above");
438                }
439            }
440        }
441    }
442
443    /// Advances the read and write cursors and discards any held data
444    ///
445    /// This can be used for copy-avoidance applications where a packet is received in order and
446    /// doesn't need to be stored temporarily for future packets to unblock the stream.
447    #[inline]
448    pub fn skip(&mut self, len: VarInt) -> Result<(), Error> {
449        // zero-length skip is a no-op
450        ensure!(len > VarInt::ZERO, Ok(()));
451
452        let new_start_offset = self
453            .cursors
454            .start_offset
455            .checked_add(len.as_u64())
456            .and_then(|v| VarInt::new(v).ok())
457            .ok_or(Error::OutOfRange)?;
458
459        if let Some(final_size) = self.final_size() {
460            ensure!(
461                final_size >= new_start_offset.as_u64(),
462                Err(Error::InvalidFin)
463            );
464        }
465
466        // record the maximum offset that we've seen
467        self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(new_start_offset.as_u64());
468
469        // update the current start offset
470        self.cursors.start_offset = new_start_offset.as_u64();
471
472        // clear out the slots to the new start offset
473        while let Some(mut slot) = self.slots.pop_front() {
474            // the new offset consumes the slot so drop and continue
475            if slot.end_allocated() < new_start_offset.as_u64() {
476                continue;
477            }
478
479            // skip to the new offset
480            slot.skip_until(new_start_offset).unwrap();
481
482            // put the slot back if it's still needed
483            if !slot.should_drop() {
484                self.slots.push_front(slot);
485            }
486
487            break;
488        }
489
490        self.invariants();
491
492        Ok(())
493    }
494
495    /// Iterates over all of the chunks waiting to be received
496    #[inline]
497    pub fn iter(&self) -> impl Iterator<Item = &[u8]> {
498        Iter::new(self)
499    }
500
501    /// Drains all of the currently available chunks
502    #[inline]
503    pub fn drain(&mut self) -> impl Iterator<Item = BytesMut> + '_ {
504        Drain { inner: self }
505    }
506
507    /// Pops a buffer from the front of the receive queue if available
508    #[inline]
509    pub fn pop(&mut self) -> Option<BytesMut> {
510        self.pop_watermarked(usize::MAX)
511    }
512
513    /// Pops a buffer from the front of the receive queue, who's length is always guaranteed to be
514    /// less than the provided `watermark`.
515    #[inline]
516    pub fn pop_watermarked(&mut self, watermark: usize) -> Option<BytesMut> {
517        let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else {
518            unsafe { assume!(false) }
519        };
520
521        ensure!(!chunk.is_empty(), None);
522
523        Some(chunk)
524    }
525
526    /// Returns the amount of data that had already been consumed from the
527    /// receive buffer.
528    #[inline]
529    pub fn consumed_len(&self) -> u64 {
530        self.cursors.start_offset
531    }
532
533    /// Returns the total amount of contiguous received data.
534    ///
535    /// This includes the already consumed data as well as the data that is still
536    /// buffered and available for consumption.
537    #[inline]
538    pub fn total_received_len(&self) -> u64 {
539        let mut offset = self.cursors.start_offset;
540
541        for slot in &self.slots {
542            ensure!(slot.is_occupied(offset), offset);
543            offset = slot.end();
544        }
545
546        offset
547    }
548
549    /// Resets the receive buffer.
550    ///
551    /// This will drop all previously received data.
552    #[inline]
553    pub fn reset(&mut self) {
554        self.slots.clear();
555        self.cursors = Default::default();
556    }
557
558    #[inline(always)]
559    fn insert(&mut self, idx: usize, slot: Slot) {
560        if self.slots.len() < idx {
561            debug_assert_eq!(self.slots.len() + 1, idx);
562            self.slots.push_back(slot);
563        } else {
564            self.slots.insert(idx, slot);
565        }
566    }
567
568    /// Allocates a slot for a reader
569    #[inline]
570    fn allocate_slot<R>(&mut self, reader: &R) -> Slot
571    where
572        R: Reader + ?Sized,
573    {
574        let start = reader.current_offset().as_u64();
575        let mut size = Self::allocation_size(start);
576        let mut offset = Self::align_offset(start, size);
577
578        // don't allocate for data we've already consumed
579        if let Some(diff) = self.cursors.start_offset.checked_sub(offset) {
580            if diff > 0 {
581                debug_assert!(
582                    reader.current_offset().as_u64() >= self.cursors.start_offset,
583                    "requests should be split before allocating slots"
584                );
585                offset = self.cursors.start_offset;
586                size -= diff as usize;
587            }
588        }
589
590        if self.cursors.final_offset
591            - reader.current_offset().as_u64()
592            - reader.buffered_len() as u64
593            == 0
594        {
595            let size_candidate = (start - offset) as usize + reader.buffered_len();
596            if size_candidate < size {
597                size = size_candidate;
598            }
599        }
600
601        let buffer = BytesMut::with_capacity(size);
602
603        let end = offset + size as u64;
604        Slot::new(offset, end, buffer)
605    }
606
607    /// Aligns an offset to a certain alignment size
608    #[inline(always)]
609    fn align_offset(offset: u64, alignment: usize) -> u64 {
610        unsafe {
611            assume!(alignment > 0);
612        }
613        (offset / (alignment as u64)) * (alignment as u64)
614    }
615
616    /// Returns the desired allocation size for the given offset
617    ///
618    /// The allocation size gradually increases as the offset increases. This is under
619    /// the assumption that streams that receive a lot of data will continue to receive
620    /// a lot of data.
621    ///
622    /// The current table is as follows:
623    ///
624    /// | offset         | allocation size |
625    /// |----------------|-----------------|
626    /// | 0              | 4096            |
627    /// | 65536          | 16384           |
628    /// | 262144         | 32768           |
629    /// | >=1048575      | 65536           |
630    #[inline(always)]
631    fn allocation_size(offset: u64) -> usize {
632        for pow in (2..=4).rev() {
633            let mult = 1 << pow;
634            let square = mult * mult;
635            let min_offset = (MIN_BUFFER_ALLOCATION_SIZE * square) as u64;
636            let allocation_size = MIN_BUFFER_ALLOCATION_SIZE * mult;
637
638            if offset >= min_offset {
639                return allocation_size;
640            }
641        }
642
643        MIN_BUFFER_ALLOCATION_SIZE
644    }
645
646    #[inline(always)]
647    fn invariants(&self) {
648        if cfg!(debug_assertions) {
649            assert_eq!(
650                self.total_received_len(),
651                self.consumed_len() + self.len() as u64
652            );
653
654            let (actual_len, chunks) = self.report();
655
656            assert_eq!(actual_len == 0, self.is_empty());
657            assert_eq!(self.iter().count(), chunks);
658
659            let mut prev_end = self.cursors.start_offset;
660
661            for (idx, slot) in self.slots.iter().enumerate() {
662                assert!(slot.start() >= prev_end, "{self:#?}");
663                assert!(!slot.should_drop(), "slot range should be non-empty");
664                prev_end = slot.end_allocated();
665
666                // make sure if the slot is full, then it was unsplit into the next slot
667                if slot.is_full() {
668                    let start = slot.start();
669                    let end = slot.end();
670
671                    let Some(next) = self.slots.get(idx + 1) else {
672                        continue;
673                    };
674
675                    ensure!(next.start() == end, continue);
676
677                    let current_block = Self::align_offset(start, Self::allocation_size(start));
678                    let next_block =
679                        Self::align_offset(next.start(), Self::allocation_size(next.start()));
680                    ensure!(current_block == next_block, continue);
681
682                    panic!("unmerged slots at {idx} and {} {self:#?}", idx + 1);
683                }
684            }
685        }
686    }
687}
688
689pub struct Iter<'a> {
690    prev_end: u64,
691    inner: vec_deque::Iter<'a, Slot>,
692}
693
694impl<'a> Iter<'a> {
695    #[inline]
696    fn new(buffer: &'a Reassembler) -> Self {
697        Self {
698            prev_end: buffer.cursors.start_offset,
699            inner: buffer.slots.iter(),
700        }
701    }
702}
703
704impl<'a> Iterator for Iter<'a> {
705    type Item = &'a [u8];
706
707    #[inline]
708    fn next(&mut self) -> Option<Self::Item> {
709        let slot = self.inner.next()?;
710
711        ensure!(slot.is_occupied(self.prev_end), None);
712
713        self.prev_end = slot.end();
714        Some(slot.as_slice())
715    }
716}
717
718pub struct Drain<'a> {
719    inner: &'a mut Reassembler,
720}
721
722impl Iterator for Drain<'_> {
723    type Item = BytesMut;
724
725    #[inline]
726    fn next(&mut self) -> Option<Self::Item> {
727        self.inner.pop()
728    }
729
730    #[inline]
731    fn size_hint(&self) -> (usize, Option<usize>) {
732        let len = self.inner.slots.len();
733        (len, Some(len))
734    }
735}