s2n_quic_core/buffer/reassembler/
reader.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::Reassembler;
5use crate::{
6    buffer::{
7        reader::{
8            storage::{Chunk, Infallible},
9            Reader, Storage,
10        },
11        writer,
12    },
13    varint::VarInt,
14};
15use bytes::BytesMut;
16
17impl Storage for Reassembler {
18    type Error = core::convert::Infallible;
19
20    #[inline]
21    fn buffered_len(&self) -> usize {
22        self.len()
23    }
24
25    #[inline]
26    fn buffer_is_empty(&self) -> bool {
27        self.is_empty()
28    }
29
30    #[inline]
31    fn read_chunk(&mut self, watermark: usize) -> Result<Chunk<'_>, Self::Error> {
32        let Some(slot) = self.slots.front_mut() else {
33            return Ok(BytesMut::new().into());
34        };
35
36        // make sure the slot has some data
37        ensure!(
38            slot.is_occupied(self.cursors.start_offset),
39            Ok(BytesMut::new().into())
40        );
41
42        // if we have a final size and this slot overlaps it then return the entire thing
43        let chunk = if self.cursors.final_size().is_some_and(|final_size| {
44            final_size <= slot.end_allocated() && watermark >= slot.buffered_len()
45        }) {
46            slot.consume()
47        } else {
48            let Chunk::BytesMut(chunk) = slot.read_chunk(watermark)? else {
49                unsafe { assume!(false) }
50            };
51            chunk
52        };
53
54        if slot.should_drop() {
55            // remove empty buffers
56            self.slots.pop_front();
57        }
58
59        super::probe::pop(self.cursors.start_offset, chunk.len());
60
61        self.cursors.start_offset += chunk.len() as u64;
62
63        self.invariants();
64
65        Ok(chunk.into())
66    }
67
68    #[inline]
69    fn partial_copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<Chunk<'_>, Self::Error>
70    where
71        Dest: writer::Storage + ?Sized,
72    {
73        // ensure we have enough capacity in the destination buf
74        ensure!(dest.has_remaining_capacity(), Ok(Default::default()));
75
76        let mut prev = BytesMut::new();
77
78        loop {
79            let remaining = dest.remaining_capacity();
80            unsafe {
81                assume!(prev.len() <= remaining);
82            }
83            let watermark = remaining - prev.len();
84
85            debug_assert!(remaining > 0);
86
87            let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else {
88                unsafe { assume!(false) }
89            };
90
91            // if the chunk is empty then return the previous value
92            ensure!(!chunk.is_empty(), Ok(prev.into()));
93
94            // flush the previous chunk if needed
95            if !prev.is_empty() {
96                dest.put_bytes_mut(prev);
97            }
98
99            // if the chunk is exactly the same size as the watermark, then return it
100            if chunk.len() == watermark {
101                return Ok(chunk.into());
102            }
103
104            // store the chunk for another iteration, in case we can pull more
105            prev = chunk;
106        }
107    }
108
109    #[inline]
110    fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
111    where
112        Dest: writer::Storage + ?Sized,
113    {
114        // if the destination wants bytes then use the partial copy logic instead
115        if Dest::SPECIALIZES_BYTES || Dest::SPECIALIZES_BYTES_MUT {
116            let mut chunk = self.infallible_partial_copy_into(dest);
117            chunk.infallible_copy_into(dest);
118            return Ok(());
119        }
120
121        loop {
122            // ensure we have enough capacity in the destination buf
123            ensure!(dest.has_remaining_capacity(), Ok(()));
124
125            let Some(slot) = self.slots.front_mut() else {
126                return Ok(());
127            };
128
129            // make sure the slot has some data
130            ensure!(slot.is_occupied(self.cursors.start_offset), Ok(()));
131
132            // avoid refcounting if the destination wants slices
133            let mut dest = dest.track_write();
134            slot.infallible_copy_into(&mut dest);
135
136            if slot.should_drop() {
137                // remove empty buffers
138                self.slots.pop_front();
139            }
140
141            super::probe::pop(self.cursors.start_offset, dest.written_len());
142
143            self.cursors.start_offset += dest.written_len() as u64;
144
145            self.invariants();
146        }
147    }
148}
149
150impl Reader for Reassembler {
151    #[inline]
152    fn current_offset(&self) -> VarInt {
153        unsafe {
154            // SAFETY: offset will always fit into a VarInt
155            VarInt::new_unchecked(self.cursors.start_offset)
156        }
157    }
158
159    #[inline]
160    fn final_offset(&self) -> Option<VarInt> {
161        self.final_size().map(|v| unsafe {
162            // SAFETY: offset will always fit into a VarInt
163            VarInt::new_unchecked(v)
164        })
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn undersized_dest_partial_copy_into_test() {
174        let mut reassembler = Reassembler::default();
175
176        reassembler.write_at(VarInt::ZERO, b"hello").unwrap();
177
178        let mut dest = &mut [0u8; 1][..];
179        let chunk = reassembler.infallible_partial_copy_into(&mut dest);
180        assert_eq!(dest.len(), 1, "the destination should not be written into");
181        assert_eq!(&chunk[..], b"h");
182
183        assert_eq!(reassembler.current_offset().as_u64(), 1);
184    }
185
186    #[test]
187    fn oversized_dest_partial_copy_into_test() {
188        let mut reassembler = Reassembler::default();
189
190        reassembler.write_at(VarInt::ZERO, b"hello").unwrap();
191
192        let mut reader = reassembler.with_checks();
193
194        let mut dest = &mut [0u8; 10][..];
195        let chunk = reader.infallible_partial_copy_into(&mut dest);
196        assert_eq!(dest.len(), 10, "the destination should not be written into");
197        assert_eq!(&chunk[..], b"hello");
198
199        assert_eq!(reader.current_offset().as_u64(), 5);
200    }
201
202    #[test]
203    fn multiple_chunk_dest_partial_copy_into_test() {
204        let mut reassembler = Reassembler::default();
205
206        // align the cursor to just before a slot boundary
207        let offset: VarInt = (super::super::MIN_BUFFER_ALLOCATION_SIZE - 1)
208            .try_into()
209            .unwrap();
210        reassembler.skip(offset).unwrap();
211        reassembler.write_at(offset, b"hello").unwrap();
212
213        let mut reader = reassembler.with_checks();
214        let mut dest = [0u8; 10];
215
216        let chunk = {
217            let mut dest = &mut dest[..];
218            let chunk = reader.infallible_partial_copy_into(&mut dest);
219            assert_eq!(
220                dest.len(),
221                9,
222                "the destination should have a single byte written to it"
223            );
224            chunk
225        };
226
227        assert_eq!(&dest[..1], b"h");
228        assert_eq!(&chunk[..], b"ello");
229    }
230}