s2n_quic_core/buffer/reader/
incremental.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    buffer::{
6        reader::{storage::Chunk, Reader, Storage},
7        writer, Error,
8    },
9    ensure,
10    varint::VarInt,
11};
12
13/// Implements an incremental [`Reader`] that joins to temporary [`Storage`] as the stream data
14///
15/// This is useful for scenarios where the stream isn't completely buffered in memory and
16/// data come in gradually.
17#[derive(Debug, Default)]
18pub struct Incremental {
19    current_offset: VarInt,
20    final_offset: Option<VarInt>,
21}
22
23impl Incremental {
24    #[inline]
25    pub fn new(current_offset: VarInt) -> Self {
26        Self {
27            current_offset,
28            final_offset: None,
29        }
30    }
31
32    #[inline]
33    pub fn with_storage<'a, C: Storage>(
34        &'a mut self,
35        storage: &'a mut C,
36        is_fin: bool,
37    ) -> Result<WithStorage<'a, C>, Error> {
38        let mut storage = WithStorage {
39            incremental: self,
40            storage,
41        };
42
43        if is_fin {
44            storage.set_fin()?;
45        } else {
46            ensure!(
47                storage.incremental.final_offset.is_none(),
48                Err(Error::InvalidFin)
49            );
50        }
51
52        Ok(storage)
53    }
54
55    #[inline]
56    pub fn current_offset(&self) -> VarInt {
57        self.current_offset
58    }
59
60    #[inline]
61    pub fn final_offset(&self) -> Option<VarInt> {
62        self.final_offset
63    }
64}
65
66pub struct WithStorage<'a, C: Storage> {
67    incremental: &'a mut Incremental,
68    storage: &'a mut C,
69}
70
71impl<C: Storage> WithStorage<'_, C> {
72    #[inline]
73    pub fn set_fin(&mut self) -> Result<&mut Self, Error> {
74        let final_offset = self
75            .incremental
76            .current_offset
77            .checked_add_usize(self.buffered_len())
78            .ok_or(Error::OutOfRange)?;
79
80        // make sure the final length doesn't change
81        if let Some(current) = self.incremental.final_offset {
82            ensure!(final_offset == current, Err(Error::InvalidFin));
83        }
84
85        self.incremental.final_offset = Some(final_offset);
86
87        Ok(self)
88    }
89}
90
91impl<C: Storage> Storage for WithStorage<'_, C> {
92    type Error = C::Error;
93
94    #[inline]
95    fn buffered_len(&self) -> usize {
96        self.storage.buffered_len()
97    }
98
99    #[inline]
100    fn read_chunk(&mut self, watermark: usize) -> Result<Chunk, Self::Error> {
101        let chunk = self.storage.read_chunk(watermark)?;
102        self.incremental.current_offset += chunk.len();
103        Ok(chunk)
104    }
105
106    #[inline]
107    fn partial_copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<Chunk, Self::Error>
108    where
109        Dest: writer::Storage + ?Sized,
110    {
111        let mut dest = dest.track_write();
112        let chunk = self.storage.partial_copy_into(&mut dest)?;
113        self.incremental.current_offset += chunk.len();
114        self.incremental.current_offset += dest.written_len();
115        Ok(chunk)
116    }
117
118    #[inline]
119    fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
120    where
121        Dest: writer::Storage + ?Sized,
122    {
123        let mut dest = dest.track_write();
124        self.storage.copy_into(&mut dest)?;
125        self.incremental.current_offset += dest.written_len();
126        Ok(())
127    }
128}
129
130impl<C: Storage> Reader for WithStorage<'_, C> {
131    #[inline]
132    fn current_offset(&self) -> VarInt {
133        self.incremental.current_offset()
134    }
135
136    #[inline]
137    fn final_offset(&self) -> Option<VarInt> {
138        self.incremental.final_offset()
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn incremental_test() {
148        let mut incremental = Incremental::default();
149
150        assert_eq!(incremental.current_offset(), VarInt::ZERO);
151        assert_eq!(incremental.final_offset(), None);
152
153        {
154            let mut chunk: &[u8] = &[1, 2, 3, 4];
155            let mut reader = incremental.with_storage(&mut chunk, false).unwrap();
156            let mut reader = reader.with_checks();
157
158            assert_eq!(reader.buffered_len(), 4);
159
160            let mut dest: &mut [u8] = &mut [0; 4];
161            let trailing_chunk = reader.partial_copy_into(&mut dest).unwrap();
162            assert_eq!(&*trailing_chunk, &[1, 2, 3, 4]);
163
164            assert_eq!(reader.buffered_len(), 0);
165        }
166
167        assert_eq!(incremental.current_offset(), VarInt::from_u8(4));
168
169        {
170            let mut chunk: &[u8] = &[5, 6, 7, 8];
171            let mut reader = incremental.with_storage(&mut chunk, true).unwrap();
172            let mut reader = reader.with_checks();
173
174            assert_eq!(reader.buffered_len(), 4);
175
176            let trailing_chunk = reader.read_chunk(usize::MAX).unwrap();
177            assert_eq!(&*trailing_chunk, &[5, 6, 7, 8]);
178
179            assert_eq!(reader.buffered_len(), 0);
180            assert!(reader.buffer_is_empty());
181            assert!(reader.is_consumed());
182        }
183
184        let incremental = Incremental::new(VarInt::from_u8(100));
185        assert_eq!(incremental.current_offset(), VarInt::from_u8(100));
186    }
187}