s2n_quic_core/buffer/duplex/
interposer.rs1use crate::{
5 buffer::{
6 duplex,
7 reader::{self, Reader, Storage as _},
8 writer::{self, Writer},
9 Error,
10 },
11 varint::VarInt,
12};
13use core::convert::Infallible;
14
15pub struct Interposer<'a, S, D>
19where
20 S: writer::Storage + ?Sized,
21 D: duplex::Skip<Error = Infallible> + ?Sized,
22{
23 storage: &'a mut S,
24 duplex: &'a mut D,
25}
26
27impl<'a, S, D> Interposer<'a, S, D>
28where
29 S: writer::Storage + ?Sized,
30 D: duplex::Skip<Error = Infallible> + ?Sized,
31{
32 #[inline]
33 pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self {
34 debug_assert!(
35 !storage.has_remaining_capacity() || duplex.buffer_is_empty(),
36 "`duplex` (len={}) should be drained into `storage` (cap={}) before constructing an Interposer",
37 duplex.buffered_len(),
38 storage.remaining_capacity()
39 );
40
41 Self { storage, duplex }
42 }
43}
44
45impl<S, D> reader::Storage for Interposer<'_, S, D>
47where
48 S: writer::Storage + ?Sized,
49 D: duplex::Skip<Error = Infallible> + ?Sized,
50{
51 type Error = D::Error;
52
53 #[inline]
54 fn buffered_len(&self) -> usize {
55 self.duplex.buffered_len()
56 }
57
58 #[inline]
59 fn buffer_is_empty(&self) -> bool {
60 self.duplex.buffer_is_empty()
61 }
62
63 #[inline]
64 fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
65 self.duplex.read_chunk(watermark)
66 }
67
68 #[inline]
69 fn partial_copy_into<Dest>(
70 &mut self,
71 dest: &mut Dest,
72 ) -> Result<reader::storage::Chunk<'_>, Self::Error>
73 where
74 Dest: writer::Storage + ?Sized,
75 {
76 self.duplex.partial_copy_into(dest)
77 }
78
79 #[inline]
80 fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
81 where
82 Dest: writer::Storage + ?Sized,
83 {
84 self.duplex.copy_into(dest)
85 }
86}
87
88impl<C, D> Reader for Interposer<'_, C, D>
90where
91 C: writer::Storage + ?Sized,
92 D: duplex::Skip<Error = Infallible> + ?Sized,
93{
94 #[inline]
95 fn current_offset(&self) -> VarInt {
96 self.duplex.current_offset()
97 }
98
99 #[inline]
100 fn final_offset(&self) -> Option<VarInt> {
101 self.duplex.final_offset()
102 }
103
104 #[inline]
105 fn has_buffered_fin(&self) -> bool {
106 self.duplex.has_buffered_fin()
107 }
108
109 #[inline]
110 fn is_consumed(&self) -> bool {
111 self.duplex.is_consumed()
112 }
113}
114
115impl<C, D> Writer for Interposer<'_, C, D>
116where
117 C: writer::Storage + ?Sized,
118 D: duplex::Skip<Error = Infallible> + ?Sized,
119{
120 #[inline]
121 fn read_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
122 where
123 R: Reader + ?Sized,
124 {
125 let final_offset = reader.final_offset();
126
127 {
128 let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;
131
132 should_delegate |= !self.storage.has_remaining_capacity();
134
135 should_delegate |= reader.current_offset() != self.duplex.current_offset();
137
138 should_delegate |= self.storage.remaining_capacity() < (reader.buffered_len() / 2);
140
141 if should_delegate {
142 self.duplex.read_from(reader)?;
143
144 return Ok(());
148 }
149 }
150
151 debug_assert!(
152 self.storage.has_remaining_capacity(),
153 "this code should only be executed if the storage has capacity"
154 );
155
156 {
157 let mut reader = reader.track_read();
159
160 reader.copy_into(self.storage)?;
161
162 let write_len = reader.consumed_len();
163 let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?;
164
165 self.duplex
167 .skip(write_len, final_offset)
168 .map_err(Error::mapped)?;
169 }
170
171 if !reader.buffer_is_empty() {
173 self.duplex.read_from(reader)?;
174 }
175
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::{
184 buffer::{
185 reader::Reader,
186 writer::{Storage as _, Writer},
187 Reassembler,
188 },
189 stream::testing::Data,
190 };
191
192 #[test]
193 fn undersized_storage_test() {
194 let mut duplex = Reassembler::default();
195 let mut reader = Data::new(10);
196 let mut checker = reader;
197
198 let mut storage: Vec<u8> = vec![];
199 {
200 let mut storage = storage.with_write_limit(1);
202
203 let mut interposer = Interposer::new(&mut storage, &mut duplex);
204
205 interposer.read_from(&mut reader).unwrap();
206 }
207
208 assert!(storage.is_empty());
210 assert_eq!(duplex.buffered_len(), 10);
211
212 checker.read_from(&mut duplex).unwrap();
214 assert_eq!(duplex.current_offset().as_u64(), 10);
215 assert!(duplex.is_consumed());
216 }
217
218 #[test]
219 fn out_of_order_test() {
220 let mut duplex = Reassembler::default();
221
222 {
224 let mut reader = Data::new(10);
225
226 let _ = reader.send_one(5);
228
229 let mut storage: Vec<u8> = vec![];
230
231 let mut interposer = Interposer::new(&mut storage, &mut duplex);
232
233 interposer.read_from(&mut reader).unwrap();
234
235 assert_eq!(reader.current_offset().as_u64(), 10);
237
238 assert_eq!(interposer.current_offset().as_u64(), 0);
239 assert_eq!(interposer.buffered_len(), 0);
240
241 assert!(storage.is_empty());
244 }
245
246 {
248 let mut reader = Data::new(10);
249
250 let mut storage: Vec<u8> = vec![];
251
252 let mut interposer = Interposer::new(&mut storage, &mut duplex);
253
254 interposer.read_from(&mut reader).unwrap();
255
256 assert_eq!(reader.current_offset().as_u64(), 10);
258
259 assert_eq!(interposer.current_offset().as_u64(), 10);
260 assert_eq!(interposer.buffered_len(), 0);
261
262 assert_eq!(storage.len(), 10);
264 assert!(duplex.is_consumed());
265 }
266 }
267
268 #[test]
269 fn skip_test() {
270 let mut duplex = Reassembler::default();
271 let mut reader = Data::new(10);
272 let mut checker = reader;
273
274 let mut storage: Vec<u8> = vec![];
275
276 let mut interposer = Interposer::new(&mut storage, &mut duplex);
277
278 interposer.read_from(&mut reader).unwrap();
279
280 assert_eq!(storage.len(), 10);
281 assert_eq!(duplex.current_offset().as_u64(), 10);
282
283 checker.receive(&[&storage[..]]);
284 }
285
286 #[test]
287 fn empty_storage_test() {
288 let mut duplex = Reassembler::default();
289 let mut reader = Data::new(10);
290 let mut checker = reader;
291
292 let mut storage = writer::storage::Empty;
293
294 let mut interposer = Interposer::new(&mut storage, &mut duplex);
295
296 interposer.read_from(&mut reader).unwrap();
297
298 assert_eq!(interposer.current_offset().as_u64(), 0);
299 assert_eq!(interposer.buffered_len(), 10);
300
301 checker.read_from(&mut interposer).unwrap();
302
303 assert_eq!(interposer.current_offset().as_u64(), 10);
304 assert!(interposer.buffer_is_empty());
305 assert_eq!(interposer.buffered_len(), 0);
306 assert!(interposer.is_consumed());
307 }
308
309 #[test]
310 fn partial_test() {
311 let mut duplex = Reassembler::default();
312 let mut reader = Data::new(10);
313 let mut checker = reader;
314
315 let mut storage: Vec<u8> = vec![];
316 {
317 let mut storage = storage.with_write_limit(9);
318
319 let mut interposer = Interposer::new(&mut storage, &mut duplex);
320
321 interposer.read_from(&mut reader).unwrap();
322 }
323
324 assert_eq!(storage.len(), 9);
326 assert_eq!(duplex.buffered_len(), 1);
327
328 checker.receive(&[&storage]);
330 checker.read_from(&mut duplex).unwrap();
331 assert_eq!(duplex.current_offset().as_u64(), 10);
332 assert!(duplex.is_consumed());
333 }
334}