s2n_quic_core/buffer/duplex/
interposer.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    buffer::{
6        duplex,
7        reader::{self, Reader, Storage as _},
8        writer::{self, Writer},
9        Error,
10    },
11    varint::VarInt,
12};
13use core::convert::Infallible;
14
15/// A wrapper around an underlying buffer (`duplex`) which will prefer to read/write from a
16/// user-provided temporary buffer (`storage`). The underlying buffer (`duplex`)'s current
17/// position and total length are updated if needed.
18pub struct Interposer<'a, S, D>
19where
20    S: writer::Storage + ?Sized,
21    D: duplex::Skip<Error = Infallible> + ?Sized,
22{
23    storage: &'a mut S,
24    duplex: &'a mut D,
25}
26
27impl<'a, S, D> Interposer<'a, S, D>
28where
29    S: writer::Storage + ?Sized,
30    D: duplex::Skip<Error = Infallible> + ?Sized,
31{
32    #[inline]
33    pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self {
34        debug_assert!(
35            !storage.has_remaining_capacity() || duplex.buffer_is_empty(),
36            "`duplex` (len={}) should be drained into `storage` (cap={}) before constructing an Interposer",
37            duplex.buffered_len(),
38            storage.remaining_capacity()
39        );
40
41        Self { storage, duplex }
42    }
43}
44
45/// Delegates to the inner Duplex
46impl<S, D> reader::Storage for Interposer<'_, S, D>
47where
48    S: writer::Storage + ?Sized,
49    D: duplex::Skip<Error = Infallible> + ?Sized,
50{
51    type Error = D::Error;
52
53    #[inline]
54    fn buffered_len(&self) -> usize {
55        self.duplex.buffered_len()
56    }
57
58    #[inline]
59    fn buffer_is_empty(&self) -> bool {
60        self.duplex.buffer_is_empty()
61    }
62
63    #[inline]
64    fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
65        self.duplex.read_chunk(watermark)
66    }
67
68    #[inline]
69    fn partial_copy_into<Dest>(
70        &mut self,
71        dest: &mut Dest,
72    ) -> Result<reader::storage::Chunk<'_>, Self::Error>
73    where
74        Dest: writer::Storage + ?Sized,
75    {
76        self.duplex.partial_copy_into(dest)
77    }
78
79    #[inline]
80    fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
81    where
82        Dest: writer::Storage + ?Sized,
83    {
84        self.duplex.copy_into(dest)
85    }
86}
87
88/// Delegates to the inner Duplex
89impl<C, D> Reader for Interposer<'_, C, D>
90where
91    C: writer::Storage + ?Sized,
92    D: duplex::Skip<Error = Infallible> + ?Sized,
93{
94    #[inline]
95    fn current_offset(&self) -> VarInt {
96        self.duplex.current_offset()
97    }
98
99    #[inline]
100    fn final_offset(&self) -> Option<VarInt> {
101        self.duplex.final_offset()
102    }
103
104    #[inline]
105    fn has_buffered_fin(&self) -> bool {
106        self.duplex.has_buffered_fin()
107    }
108
109    #[inline]
110    fn is_consumed(&self) -> bool {
111        self.duplex.is_consumed()
112    }
113}
114
115impl<C, D> Writer for Interposer<'_, C, D>
116where
117    C: writer::Storage + ?Sized,
118    D: duplex::Skip<Error = Infallible> + ?Sized,
119{
120    #[inline]
121    fn read_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
122    where
123        R: Reader + ?Sized,
124    {
125        let final_offset = reader.final_offset();
126
127        {
128            // if the storage specializes writing zero-copy Bytes/BytesMut, then just write to the
129            // receive buffer, since that's what it stores
130            let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;
131
132            // if the storage has no space left then write into the duplex
133            should_delegate |= !self.storage.has_remaining_capacity();
134
135            // if this packet is non-contiguous, then delegate to the wrapped writer
136            should_delegate |= reader.current_offset() != self.duplex.current_offset();
137
138            // if the storage has less than half of the payload, then delegate
139            should_delegate |= self.storage.remaining_capacity() < (reader.buffered_len() / 2);
140
141            if should_delegate {
142                self.duplex.read_from(reader)?;
143
144                // don't copy into `storage` here - let the caller do that later since it can be
145                // more efficient to pull from `duplex` all in one go.
146
147                return Ok(());
148            }
149        }
150
151        debug_assert!(
152            self.storage.has_remaining_capacity(),
153            "this code should only be executed if the storage has capacity"
154        );
155
156        {
157            // track the number of consumed bytes
158            let mut reader = reader.track_read();
159
160            reader.copy_into(self.storage)?;
161
162            let write_len = reader.consumed_len();
163            let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?;
164
165            // notify the duplex that we bypassed it and should skip
166            self.duplex
167                .skip(write_len, final_offset)
168                .map_err(Error::mapped)?;
169        }
170
171        // if we still have some remaining bytes consume the rest in the duplex
172        if !reader.buffer_is_empty() {
173            self.duplex.read_from(reader)?;
174        }
175
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use crate::{
184        buffer::{
185            reader::Reader,
186            writer::{Storage as _, Writer},
187            Reassembler,
188        },
189        stream::testing::Data,
190    };
191
192    #[test]
193    fn undersized_storage_test() {
194        let mut duplex = Reassembler::default();
195        let mut reader = Data::new(10);
196        let mut checker = reader;
197
198        let mut storage: Vec<u8> = vec![];
199        {
200            // limit the storage capacity so we force writing into the duplex
201            let mut storage = storage.with_write_limit(1);
202
203            let mut interposer = Interposer::new(&mut storage, &mut duplex);
204
205            interposer.read_from(&mut reader).unwrap();
206        }
207
208        // the storage was too small so we delegated to duplex
209        assert!(storage.is_empty());
210        assert_eq!(duplex.buffered_len(), 10);
211
212        // move the reassembled bytes into the checker
213        checker.read_from(&mut duplex).unwrap();
214        assert_eq!(duplex.current_offset().as_u64(), 10);
215        assert!(duplex.is_consumed());
216    }
217
218    #[test]
219    fn out_of_order_test() {
220        let mut duplex = Reassembler::default();
221
222        // first write 5 bytes at offset 5
223        {
224            let mut reader = Data::new(10);
225
226            // advance the reader by 5 bytes
227            let _ = reader.send_one(5);
228
229            let mut storage: Vec<u8> = vec![];
230
231            let mut interposer = Interposer::new(&mut storage, &mut duplex);
232
233            interposer.read_from(&mut reader).unwrap();
234
235            // make sure we consumed the reader
236            assert_eq!(reader.current_offset().as_u64(), 10);
237
238            assert_eq!(interposer.current_offset().as_u64(), 0);
239            assert_eq!(interposer.buffered_len(), 0);
240
241            // make sure we didn't write to the storage, even if we had capacity, since the
242            // current_offset doesn't match
243            assert!(storage.is_empty());
244        }
245
246        // then write 10 bytes at offset 0
247        {
248            let mut reader = Data::new(10);
249
250            let mut storage: Vec<u8> = vec![];
251
252            let mut interposer = Interposer::new(&mut storage, &mut duplex);
253
254            interposer.read_from(&mut reader).unwrap();
255
256            // make sure we consumed the reader
257            assert_eq!(reader.current_offset().as_u64(), 10);
258
259            assert_eq!(interposer.current_offset().as_u64(), 10);
260            assert_eq!(interposer.buffered_len(), 0);
261
262            // make sure we copied the entire reader
263            assert_eq!(storage.len(), 10);
264            assert!(duplex.is_consumed());
265        }
266    }
267
268    #[test]
269    fn skip_test() {
270        let mut duplex = Reassembler::default();
271        let mut reader = Data::new(10);
272        let mut checker = reader;
273
274        let mut storage: Vec<u8> = vec![];
275
276        let mut interposer = Interposer::new(&mut storage, &mut duplex);
277
278        interposer.read_from(&mut reader).unwrap();
279
280        assert_eq!(storage.len(), 10);
281        assert_eq!(duplex.current_offset().as_u64(), 10);
282
283        checker.receive(&[&storage[..]]);
284    }
285
286    #[test]
287    fn empty_storage_test() {
288        let mut duplex = Reassembler::default();
289        let mut reader = Data::new(10);
290        let mut checker = reader;
291
292        let mut storage = writer::storage::Empty;
293
294        let mut interposer = Interposer::new(&mut storage, &mut duplex);
295
296        interposer.read_from(&mut reader).unwrap();
297
298        assert_eq!(interposer.current_offset().as_u64(), 0);
299        assert_eq!(interposer.buffered_len(), 10);
300
301        checker.read_from(&mut interposer).unwrap();
302
303        assert_eq!(interposer.current_offset().as_u64(), 10);
304        assert!(interposer.buffer_is_empty());
305        assert_eq!(interposer.buffered_len(), 0);
306        assert!(interposer.is_consumed());
307    }
308
309    #[test]
310    fn partial_test() {
311        let mut duplex = Reassembler::default();
312        let mut reader = Data::new(10);
313        let mut checker = reader;
314
315        let mut storage: Vec<u8> = vec![];
316        {
317            let mut storage = storage.with_write_limit(9);
318
319            let mut interposer = Interposer::new(&mut storage, &mut duplex);
320
321            interposer.read_from(&mut reader).unwrap();
322        }
323
324        // the storage was at least half the reader
325        assert_eq!(storage.len(), 9);
326        assert_eq!(duplex.buffered_len(), 1);
327
328        // move the reassembled bytes into the checker
329        checker.receive(&[&storage]);
330        checker.read_from(&mut duplex).unwrap();
331        assert_eq!(duplex.current_offset().as_u64(), 10);
332        assert!(duplex.is_consumed());
333    }
334}