s2n_quic_core/buffer/duplex/
interposer.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    buffer::{
6        duplex,
7        reader::{self, Reader, Storage as _},
8        writer::{self, Writer},
9        Error,
10    },
11    varint::VarInt,
12};
13use core::convert::Infallible;
14
15/// A wrapper around an underlying buffer (`duplex`) which will prefer to read/write from a
16/// user-provided temporary buffer (`storage`). The underlying buffer (`duplex`)'s current
17/// position and total length are updated if needed.
18pub struct Interposer<'a, S, D>
19where
20    S: writer::Storage + ?Sized,
21    D: duplex::Skip<Error = Infallible> + ?Sized,
22{
23    storage: &'a mut S,
24    duplex: &'a mut D,
25}
26
27impl<'a, S, D> Interposer<'a, S, D>
28where
29    S: writer::Storage + ?Sized,
30    D: duplex::Skip<Error = Infallible> + ?Sized,
31{
32    #[inline]
33    pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self {
34        debug_assert!(
35            !storage.has_remaining_capacity() || duplex.buffer_is_empty(),
36            "`duplex` (len={}) should be drained into `storage` (cap={}) before constructing an Interposer",
37            duplex.buffered_len(),
38            storage.remaining_capacity()
39        );
40
41        Self { storage, duplex }
42    }
43}
44
45/// Delegates to the inner Duplex
46impl<S, D> reader::Storage for Interposer<'_, S, D>
47where
48    S: writer::Storage + ?Sized,
49    D: duplex::Skip<Error = Infallible> + ?Sized,
50{
51    type Error = D::Error;
52
53    #[inline]
54    fn buffered_len(&self) -> usize {
55        self.duplex.buffered_len()
56    }
57
58    #[inline]
59    fn buffer_is_empty(&self) -> bool {
60        self.duplex.buffer_is_empty()
61    }
62
63    #[inline]
64    fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
65        self.duplex.read_chunk(watermark)
66    }
67
68    #[inline]
69    fn partial_copy_into<Dest>(
70        &mut self,
71        dest: &mut Dest,
72    ) -> Result<reader::storage::Chunk<'_>, Self::Error>
73    where
74        Dest: writer::Storage + ?Sized,
75    {
76        self.duplex.partial_copy_into(dest)
77    }
78
79    #[inline]
80    fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
81    where
82        Dest: writer::Storage + ?Sized,
83    {
84        self.duplex.copy_into(dest)
85    }
86}
87
88/// Delegates to the inner Duplex
89impl<C, D> Reader for Interposer<'_, C, D>
90where
91    C: writer::Storage + ?Sized,
92    D: duplex::Skip<Error = Infallible> + ?Sized,
93{
94    #[inline]
95    fn current_offset(&self) -> VarInt {
96        self.duplex.current_offset()
97    }
98
99    #[inline]
100    fn final_offset(&self) -> Option<VarInt> {
101        self.duplex.final_offset()
102    }
103
104    #[inline]
105    fn has_buffered_fin(&self) -> bool {
106        self.duplex.has_buffered_fin()
107    }
108
109    #[inline]
110    fn is_consumed(&self) -> bool {
111        self.duplex.is_consumed()
112    }
113}
114
115impl<C, D> Writer for Interposer<'_, C, D>
116where
117    C: writer::Storage + ?Sized,
118    D: duplex::Skip<Error = Infallible> + ?Sized,
119{
120    #[inline]
121    fn read_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
122    where
123        R: Reader + ?Sized,
124    {
125        let final_offset = reader.final_offset();
126
127        {
128            // if the storage specializes writing zero-copy Bytes/BytesMut, then just write to the
129            // receive buffer, since that's what it stores
130            let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;
131
132            // if the storage has no space left then write into the duplex
133            should_delegate |= !self.storage.has_remaining_capacity();
134
135            // if this packet is non-contiguous, then delegate to the wrapped writer
136            should_delegate |= reader.current_offset() != self.duplex.current_offset();
137
138            // We want to decrypt into our own buffer (`duplex`) if that's cheaper. Note that
139            // decrypt is itself an optional "free" copy. Our choices are:
140            //
141            // 1. Decrypt in-place, copy head to `storage` and tail to `duplex`
142            // 2. Decrypt to `duplex`, copy head to `storage`
143            // 3. Decrypt to `storage`
144            //
145            // 1 and 2 are always possible (duplex can take any amount of data).
146            // 1 is always more expensive than 2 in `# of bytes copied` but does potentially save
147            // allocating large buffer(s) in `duplex`, if most of the bytes go into storage.
148            //
149            // 3 is optimal and should essentially always be preferred. Maybe that should even
150            // override SPECIALIZES_BYTES/_MUT?
151            should_delegate |= self.storage.remaining_capacity() < (reader.buffered_len() / 2);
152
153            if should_delegate {
154                self.duplex.read_from(reader)?;
155
156                // don't copy into `storage` here - let the caller do that later since it can be
157                // more efficient to pull from `duplex` all in one go.
158
159                return Ok(());
160            }
161        }
162
163        debug_assert!(
164            self.storage.has_remaining_capacity(),
165            "this code should only be executed if the storage has capacity"
166        );
167
168        {
169            // track the number of consumed bytes
170            let mut reader = reader.track_read();
171
172            reader.copy_into(self.storage)?;
173
174            let write_len = reader.consumed_len();
175            let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?;
176
177            // notify the duplex that we bypassed it and should skip
178            self.duplex
179                .skip(write_len, final_offset)
180                .map_err(Error::mapped)?;
181        }
182
183        // if we still have some remaining bytes consume the rest in the duplex
184        if !reader.buffer_is_empty() {
185            self.duplex.read_from(reader)?;
186        }
187
188        Ok(())
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::{
196        buffer::{
197            reader::Reader,
198            writer::{Storage as _, Writer},
199            Reassembler,
200        },
201        stream::testing::Data,
202    };
203
204    #[test]
205    fn undersized_storage_test() {
206        let mut duplex = Reassembler::default();
207        let mut reader = Data::new(10);
208        let mut checker = reader;
209
210        let mut storage: Vec<u8> = vec![];
211        {
212            // limit the storage capacity so we force writing into the duplex
213            let mut storage = storage.with_write_limit(1);
214
215            let mut interposer = Interposer::new(&mut storage, &mut duplex);
216
217            interposer.read_from(&mut reader).unwrap();
218        }
219
220        // the storage was too small so we delegated to duplex
221        assert!(storage.is_empty());
222        assert_eq!(duplex.buffered_len(), 10);
223
224        // move the reassembled bytes into the checker
225        checker.read_from(&mut duplex).unwrap();
226        assert_eq!(duplex.current_offset().as_u64(), 10);
227        assert!(duplex.is_consumed());
228    }
229
230    #[test]
231    fn out_of_order_test() {
232        let mut duplex = Reassembler::default();
233
234        // first write 5 bytes at offset 5
235        {
236            let mut reader = Data::new(10);
237
238            // advance the reader by 5 bytes
239            let _ = reader.send_one(5);
240
241            let mut storage: Vec<u8> = vec![];
242
243            let mut interposer = Interposer::new(&mut storage, &mut duplex);
244
245            interposer.read_from(&mut reader).unwrap();
246
247            // make sure we consumed the reader
248            assert_eq!(reader.current_offset().as_u64(), 10);
249
250            assert_eq!(interposer.current_offset().as_u64(), 0);
251            assert_eq!(interposer.buffered_len(), 0);
252
253            // make sure we didn't write to the storage, even if we had capacity, since the
254            // current_offset doesn't match
255            assert!(storage.is_empty());
256        }
257
258        // then write 10 bytes at offset 0
259        {
260            let mut reader = Data::new(10);
261
262            let mut storage: Vec<u8> = vec![];
263
264            let mut interposer = Interposer::new(&mut storage, &mut duplex);
265
266            interposer.read_from(&mut reader).unwrap();
267
268            // make sure we consumed the reader
269            assert_eq!(reader.current_offset().as_u64(), 10);
270
271            assert_eq!(interposer.current_offset().as_u64(), 10);
272            assert_eq!(interposer.buffered_len(), 0);
273
274            // make sure we copied the entire reader
275            assert_eq!(storage.len(), 10);
276            assert!(duplex.is_consumed());
277        }
278    }
279
280    #[test]
281    fn skip_test() {
282        let mut duplex = Reassembler::default();
283        let mut reader = Data::new(10);
284        let mut checker = reader;
285
286        let mut storage: Vec<u8> = vec![];
287
288        let mut interposer = Interposer::new(&mut storage, &mut duplex);
289
290        interposer.read_from(&mut reader).unwrap();
291
292        assert_eq!(storage.len(), 10);
293        assert_eq!(duplex.current_offset().as_u64(), 10);
294
295        checker.receive(&[&storage[..]]);
296    }
297
298    #[test]
299    fn empty_storage_test() {
300        let mut duplex = Reassembler::default();
301        let mut reader = Data::new(10);
302        let mut checker = reader;
303
304        let mut storage = writer::storage::Empty;
305
306        let mut interposer = Interposer::new(&mut storage, &mut duplex);
307
308        interposer.read_from(&mut reader).unwrap();
309
310        assert_eq!(interposer.current_offset().as_u64(), 0);
311        assert_eq!(interposer.buffered_len(), 10);
312
313        checker.read_from(&mut interposer).unwrap();
314
315        assert_eq!(interposer.current_offset().as_u64(), 10);
316        assert!(interposer.buffer_is_empty());
317        assert_eq!(interposer.buffered_len(), 0);
318        assert!(interposer.is_consumed());
319    }
320
321    #[test]
322    fn partial_test() {
323        let mut duplex = Reassembler::default();
324        let mut reader = Data::new(10);
325        let mut checker = reader;
326
327        let mut storage: Vec<u8> = vec![];
328        {
329            let mut storage = storage.with_write_limit(9);
330
331            let mut interposer = Interposer::new(&mut storage, &mut duplex);
332
333            interposer.read_from(&mut reader).unwrap();
334        }
335
336        // the storage was at least half the reader
337        assert_eq!(storage.len(), 9);
338        assert_eq!(duplex.buffered_len(), 1);
339
340        // move the reassembled bytes into the checker
341        checker.receive(&[&storage]);
342        checker.read_from(&mut duplex).unwrap();
343        assert_eq!(duplex.current_offset().as_u64(), 10);
344        assert!(duplex.is_consumed());
345    }
346}