s2n_quic_core/buffer/reader/
incremental.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    buffer::{
6        reader::{storage::Chunk, Reader, Storage},
7        writer, Error,
8    },
9    varint::VarInt,
10};
11
12/// Implements an incremental [`Reader`] that joins to temporary [`Storage`] as the stream data
13///
14/// This is useful for scenarios where the stream isn't completely buffered in memory and
15/// data come in gradually.
16#[derive(Debug, Default)]
17pub struct Incremental {
18    current_offset: VarInt,
19    final_offset: Option<VarInt>,
20}
21
22impl Incremental {
23    #[inline]
24    pub fn new(current_offset: VarInt) -> Self {
25        Self {
26            current_offset,
27            final_offset: None,
28        }
29    }
30
31    #[inline]
32    pub fn with_storage<'a, C: Storage>(
33        &'a mut self,
34        storage: &'a mut C,
35        is_fin: bool,
36    ) -> Result<WithStorage<'a, C>, Error> {
37        let mut storage = WithStorage {
38            incremental: self,
39            storage,
40        };
41
42        if is_fin {
43            storage.set_fin()?;
44        } else {
45            ensure!(
46                storage.incremental.final_offset.is_none(),
47                Err(Error::InvalidFin)
48            );
49        }
50
51        Ok(storage)
52    }
53
54    #[inline]
55    pub fn current_offset(&self) -> VarInt {
56        self.current_offset
57    }
58
59    #[inline]
60    pub fn final_offset(&self) -> Option<VarInt> {
61        self.final_offset
62    }
63}
64
65pub struct WithStorage<'a, C: Storage> {
66    incremental: &'a mut Incremental,
67    storage: &'a mut C,
68}
69
70impl<C: Storage> WithStorage<'_, C> {
71    #[inline]
72    pub fn set_fin(&mut self) -> Result<&mut Self, Error> {
73        let final_offset = self
74            .incremental
75            .current_offset
76            .checked_add_usize(self.buffered_len())
77            .ok_or(Error::OutOfRange)?;
78
79        // make sure the final length doesn't change
80        if let Some(current) = self.incremental.final_offset {
81            ensure!(final_offset == current, Err(Error::InvalidFin));
82        }
83
84        self.incremental.final_offset = Some(final_offset);
85
86        Ok(self)
87    }
88}
89
90impl<C: Storage> Storage for WithStorage<'_, C> {
91    type Error = C::Error;
92
93    #[inline]
94    fn buffered_len(&self) -> usize {
95        self.storage.buffered_len()
96    }
97
98    #[inline]
99    fn read_chunk(&mut self, watermark: usize) -> Result<Chunk<'_>, Self::Error> {
100        let chunk = self.storage.read_chunk(watermark)?;
101        self.incremental.current_offset += chunk.len();
102        Ok(chunk)
103    }
104
105    #[inline]
106    fn partial_copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<Chunk<'_>, Self::Error>
107    where
108        Dest: writer::Storage + ?Sized,
109    {
110        let mut dest = dest.track_write();
111        let chunk = self.storage.partial_copy_into(&mut dest)?;
112        self.incremental.current_offset += chunk.len();
113        self.incremental.current_offset += dest.written_len();
114        Ok(chunk)
115    }
116
117    #[inline]
118    fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
119    where
120        Dest: writer::Storage + ?Sized,
121    {
122        let mut dest = dest.track_write();
123        self.storage.copy_into(&mut dest)?;
124        self.incremental.current_offset += dest.written_len();
125        Ok(())
126    }
127}
128
129impl<C: Storage> Reader for WithStorage<'_, C> {
130    #[inline]
131    fn current_offset(&self) -> VarInt {
132        self.incremental.current_offset()
133    }
134
135    #[inline]
136    fn final_offset(&self) -> Option<VarInt> {
137        self.incremental.final_offset()
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    #[test]
146    fn incremental_test() {
147        let mut incremental = Incremental::default();
148
149        assert_eq!(incremental.current_offset(), VarInt::ZERO);
150        assert_eq!(incremental.final_offset(), None);
151
152        {
153            let mut chunk: &[u8] = &[1, 2, 3, 4];
154            let mut reader = incremental.with_storage(&mut chunk, false).unwrap();
155            let mut reader = reader.with_checks();
156
157            assert_eq!(reader.buffered_len(), 4);
158
159            let mut dest: &mut [u8] = &mut [0; 4];
160            let trailing_chunk = reader.partial_copy_into(&mut dest).unwrap();
161            assert_eq!(&*trailing_chunk, &[1, 2, 3, 4]);
162
163            assert_eq!(reader.buffered_len(), 0);
164        }
165
166        assert_eq!(incremental.current_offset(), VarInt::from_u8(4));
167
168        {
169            let mut chunk: &[u8] = &[5, 6, 7, 8];
170            let mut reader = incremental.with_storage(&mut chunk, true).unwrap();
171            let mut reader = reader.with_checks();
172
173            assert_eq!(reader.buffered_len(), 4);
174
175            let trailing_chunk = reader.read_chunk(usize::MAX).unwrap();
176            assert_eq!(&*trailing_chunk, &[5, 6, 7, 8]);
177
178            assert_eq!(reader.buffered_len(), 0);
179            assert!(reader.buffer_is_empty());
180            assert!(reader.is_consumed());
181        }
182
183        let incremental = Incremental::new(VarInt::from_u8(100));
184        assert_eq!(incremental.current_offset(), VarInt::from_u8(100));
185    }
186}