s2n_quic_core/buffer/duplex/
interposer.rs1use crate::{
5 buffer::{
6 duplex,
7 reader::{self, Reader, Storage as _},
8 writer::{self, Writer},
9 Error,
10 },
11 varint::VarInt,
12};
13use core::convert::Infallible;
14
15pub struct Interposer<'a, S, D>
19where
20 S: writer::Storage + ?Sized,
21 D: duplex::Skip<Error = Infallible> + ?Sized,
22{
23 storage: &'a mut S,
24 duplex: &'a mut D,
25}
26
27impl<'a, S, D> Interposer<'a, S, D>
28where
29 S: writer::Storage + ?Sized,
30 D: duplex::Skip<Error = Infallible> + ?Sized,
31{
32 #[inline]
33 pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self {
34 debug_assert!(
35 !storage.has_remaining_capacity() || duplex.buffer_is_empty(),
36 "`duplex` (len={}) should be drained into `storage` (cap={}) before constructing an Interposer",
37 duplex.buffered_len(),
38 storage.remaining_capacity()
39 );
40
41 Self { storage, duplex }
42 }
43}
44
45impl<S, D> reader::Storage for Interposer<'_, S, D>
47where
48 S: writer::Storage + ?Sized,
49 D: duplex::Skip<Error = Infallible> + ?Sized,
50{
51 type Error = D::Error;
52
53 #[inline]
54 fn buffered_len(&self) -> usize {
55 self.duplex.buffered_len()
56 }
57
58 #[inline]
59 fn buffer_is_empty(&self) -> bool {
60 self.duplex.buffer_is_empty()
61 }
62
63 #[inline]
64 fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
65 self.duplex.read_chunk(watermark)
66 }
67
68 #[inline]
69 fn partial_copy_into<Dest>(
70 &mut self,
71 dest: &mut Dest,
72 ) -> Result<reader::storage::Chunk<'_>, Self::Error>
73 where
74 Dest: writer::Storage + ?Sized,
75 {
76 self.duplex.partial_copy_into(dest)
77 }
78
79 #[inline]
80 fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
81 where
82 Dest: writer::Storage + ?Sized,
83 {
84 self.duplex.copy_into(dest)
85 }
86}
87
88impl<C, D> Reader for Interposer<'_, C, D>
90where
91 C: writer::Storage + ?Sized,
92 D: duplex::Skip<Error = Infallible> + ?Sized,
93{
94 #[inline]
95 fn current_offset(&self) -> VarInt {
96 self.duplex.current_offset()
97 }
98
99 #[inline]
100 fn final_offset(&self) -> Option<VarInt> {
101 self.duplex.final_offset()
102 }
103
104 #[inline]
105 fn has_buffered_fin(&self) -> bool {
106 self.duplex.has_buffered_fin()
107 }
108
109 #[inline]
110 fn is_consumed(&self) -> bool {
111 self.duplex.is_consumed()
112 }
113}
114
115impl<C, D> Writer for Interposer<'_, C, D>
116where
117 C: writer::Storage + ?Sized,
118 D: duplex::Skip<Error = Infallible> + ?Sized,
119{
120 #[inline]
121 fn read_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
122 where
123 R: Reader + ?Sized,
124 {
125 let final_offset = reader.final_offset();
126
127 {
128 let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;
131
132 should_delegate |= !self.storage.has_remaining_capacity();
134
135 should_delegate |= reader.current_offset() != self.duplex.current_offset();
137
138 should_delegate |= self.storage.remaining_capacity() < (reader.buffered_len() / 2);
152
153 if should_delegate {
154 self.duplex.read_from(reader)?;
155
156 return Ok(());
160 }
161 }
162
163 debug_assert!(
164 self.storage.has_remaining_capacity(),
165 "this code should only be executed if the storage has capacity"
166 );
167
168 {
169 let mut reader = reader.track_read();
171
172 reader.copy_into(self.storage)?;
173
174 let write_len = reader.consumed_len();
175 let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?;
176
177 self.duplex
179 .skip(write_len, final_offset)
180 .map_err(Error::mapped)?;
181 }
182
183 if !reader.buffer_is_empty() {
185 self.duplex.read_from(reader)?;
186 }
187
188 Ok(())
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::{
196 buffer::{
197 reader::Reader,
198 writer::{Storage as _, Writer},
199 Reassembler,
200 },
201 stream::testing::Data,
202 };
203
204 #[test]
205 fn undersized_storage_test() {
206 let mut duplex = Reassembler::default();
207 let mut reader = Data::new(10);
208 let mut checker = reader;
209
210 let mut storage: Vec<u8> = vec![];
211 {
212 let mut storage = storage.with_write_limit(1);
214
215 let mut interposer = Interposer::new(&mut storage, &mut duplex);
216
217 interposer.read_from(&mut reader).unwrap();
218 }
219
220 assert!(storage.is_empty());
222 assert_eq!(duplex.buffered_len(), 10);
223
224 checker.read_from(&mut duplex).unwrap();
226 assert_eq!(duplex.current_offset().as_u64(), 10);
227 assert!(duplex.is_consumed());
228 }
229
230 #[test]
231 fn out_of_order_test() {
232 let mut duplex = Reassembler::default();
233
234 {
236 let mut reader = Data::new(10);
237
238 let _ = reader.send_one(5);
240
241 let mut storage: Vec<u8> = vec![];
242
243 let mut interposer = Interposer::new(&mut storage, &mut duplex);
244
245 interposer.read_from(&mut reader).unwrap();
246
247 assert_eq!(reader.current_offset().as_u64(), 10);
249
250 assert_eq!(interposer.current_offset().as_u64(), 0);
251 assert_eq!(interposer.buffered_len(), 0);
252
253 assert!(storage.is_empty());
256 }
257
258 {
260 let mut reader = Data::new(10);
261
262 let mut storage: Vec<u8> = vec![];
263
264 let mut interposer = Interposer::new(&mut storage, &mut duplex);
265
266 interposer.read_from(&mut reader).unwrap();
267
268 assert_eq!(reader.current_offset().as_u64(), 10);
270
271 assert_eq!(interposer.current_offset().as_u64(), 10);
272 assert_eq!(interposer.buffered_len(), 0);
273
274 assert_eq!(storage.len(), 10);
276 assert!(duplex.is_consumed());
277 }
278 }
279
280 #[test]
281 fn skip_test() {
282 let mut duplex = Reassembler::default();
283 let mut reader = Data::new(10);
284 let mut checker = reader;
285
286 let mut storage: Vec<u8> = vec![];
287
288 let mut interposer = Interposer::new(&mut storage, &mut duplex);
289
290 interposer.read_from(&mut reader).unwrap();
291
292 assert_eq!(storage.len(), 10);
293 assert_eq!(duplex.current_offset().as_u64(), 10);
294
295 checker.receive(&[&storage[..]]);
296 }
297
298 #[test]
299 fn empty_storage_test() {
300 let mut duplex = Reassembler::default();
301 let mut reader = Data::new(10);
302 let mut checker = reader;
303
304 let mut storage = writer::storage::Empty;
305
306 let mut interposer = Interposer::new(&mut storage, &mut duplex);
307
308 interposer.read_from(&mut reader).unwrap();
309
310 assert_eq!(interposer.current_offset().as_u64(), 0);
311 assert_eq!(interposer.buffered_len(), 10);
312
313 checker.read_from(&mut interposer).unwrap();
314
315 assert_eq!(interposer.current_offset().as_u64(), 10);
316 assert!(interposer.buffer_is_empty());
317 assert_eq!(interposer.buffered_len(), 0);
318 assert!(interposer.is_consumed());
319 }
320
321 #[test]
322 fn partial_test() {
323 let mut duplex = Reassembler::default();
324 let mut reader = Data::new(10);
325 let mut checker = reader;
326
327 let mut storage: Vec<u8> = vec![];
328 {
329 let mut storage = storage.with_write_limit(9);
330
331 let mut interposer = Interposer::new(&mut storage, &mut duplex);
332
333 interposer.read_from(&mut reader).unwrap();
334 }
335
336 assert_eq!(storage.len(), 9);
338 assert_eq!(duplex.buffered_len(), 1);
339
340 checker.receive(&[&storage]);
342 checker.read_from(&mut duplex).unwrap();
343 assert_eq!(duplex.current_offset().as_u64(), 10);
344 assert!(duplex.is_consumed());
345 }
346}