s2n_quic_core/frame/
stream.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    frame::{FitError, Tag},
6    varint::VarInt,
7};
8use core::{convert::TryFrom, mem::size_of};
9use s2n_codec::{
10    decoder_parameterized_value, DecoderBuffer, DecoderBufferMut, Encoder, EncoderValue,
11};
12
13//= https://www.rfc-editor.org/rfc/rfc9000#section-19.8
14//# STREAM frames implicitly create a stream and carry stream data.  The
15//# Type field in the STREAM frame takes the form 0b00001XXX (or the set
16//# of values from 0x08 to 0x0f).
17
18macro_rules! stream_tag {
19    () => {
20        0x08u8..=0x0fu8
21    };
22}
23
24const STREAM_TAG: u8 = 0x08;
25
26//= https://www.rfc-editor.org/rfc/rfc9000#section-19.8
27//# *  The OFF bit (0x04) in the frame type is set to indicate that there
28//#    is an Offset field present.  When set to 1, the Offset field is
29//#    present.  When set to 0, the Offset field is absent and the Stream
30//#    Data starts at an offset of 0 (that is, the frame contains the
31//#    first bytes of the stream, or the end of a stream that includes no
32//#    data).
33
34const OFF_BIT: u8 = 0x04;
35
36//= https://www.rfc-editor.org/rfc/rfc9000#section-19.8
37//# *  The LEN bit (0x02) in the frame type is set to indicate that there
38//#    is a Length field present.  If this bit is set to 0, the Length
39//#    field is absent and the Stream Data field extends to the end of
40//#    the packet.  If this bit is set to 1, the Length field is present.
41
42const LEN_BIT: u8 = 0x02;
43
44//= https://www.rfc-editor.org/rfc/rfc9000#section-19.8
45//# *  The FIN bit (0x01) indicates that the frame marks the end of the
46//#    stream.  The final size of the stream is the sum of the offset and
47//#    the length of this frame.
48
49const FIN_BIT: u8 = 0x01;
50
51//= https://www.rfc-editor.org/rfc/rfc9000#section-19.8
52//# STREAM Frame {
53//#   Type (i) = 0x08..0x0f,
54//#   Stream ID (i),
55//#   [Offset (i)],
56//#   [Length (i)],
57//#   Stream Data (..),
58//# }
59
60//= https://www.rfc-editor.org/rfc/rfc9000#section-19.8
61//# STREAM frames contain the following fields:
62//#
63//# Stream ID:  A variable-length integer indicating the stream ID of the
64//#    stream; see Section 2.1.
65//#
66//# Offset:  A variable-length integer specifying the byte offset in the
67//#    stream for the data in this STREAM frame.  This field is present
68//#    when the OFF bit is set to 1.  When the Offset field is absent,
69//#    the offset is 0.
70//#
71//# Length:  A variable-length integer specifying the length of the
72//#    Stream Data field in this STREAM frame.  This field is present
73//#    when the LEN bit is set to 1.  When the LEN bit is set to 0, the
74//#    Stream Data field consumes all the remaining bytes in the packet.
75//#
76//# Stream Data:  The bytes from the designated stream to be delivered.
77
78#[derive(Debug, PartialEq, Eq)]
79pub struct Stream<Data> {
80    /// A variable-length integer indicating the stream ID of the stream
81    pub stream_id: VarInt,
82
83    /// A variable-length integer specifying the byte offset in the
84    /// stream for the data in this STREAM frame.
85    pub offset: VarInt,
86
87    /// If true, the frame is the last frame in the payload
88    pub is_last_frame: bool,
89
90    /// If true, the frame marks the end of the stream.
91    pub is_fin: bool,
92
93    /// The bytes from the designated stream to be delivered.
94    pub data: Data,
95}
96
97pub type StreamRef<'a> = Stream<&'a [u8]>;
98pub type StreamMut<'a> = Stream<&'a mut [u8]>;
99
100impl<Data> Stream<Data> {
101    #[inline]
102    pub fn tag(&self) -> u8 {
103        let mut tag: u8 = STREAM_TAG;
104
105        if *self.offset != 0 {
106            tag |= OFF_BIT;
107        }
108
109        if !self.is_last_frame {
110            tag |= LEN_BIT;
111        }
112
113        if self.is_fin {
114            tag |= FIN_BIT;
115        }
116
117        tag
118    }
119
120    /// Converts the stream data from one type to another
121    #[inline]
122    pub fn map_data<F: FnOnce(Data) -> Out, Out>(self, map: F) -> Stream<Out> {
123        Stream {
124            stream_id: self.stream_id,
125            offset: self.offset,
126            is_last_frame: self.is_last_frame,
127            is_fin: self.is_fin,
128            data: map(self.data),
129        }
130    }
131}
132
133impl<Data: EncoderValue> Stream<Data> {
134    /// Tries to fit the frame into the provided capacity
135    ///
136    /// The `is_last_frame` field will be updated with this call.
137    ///
138    /// If ok, the new payload length is returned, otherwise the frame cannot
139    /// fit.
140    #[inline]
141    pub fn try_fit(&mut self, capacity: usize) -> Result<usize, FitError> {
142        let mut fixed_len = 0;
143        fixed_len += size_of::<Tag>();
144        fixed_len += self.stream_id.encoding_size();
145
146        if self.offset != 0u64 {
147            fixed_len += self.offset.encoding_size();
148        }
149
150        let remaining_capacity = capacity.checked_sub(fixed_len).ok_or(FitError)?;
151
152        let data_len = self.data.encoding_size();
153        let max_data_len = remaining_capacity.min(data_len);
154
155        // If data fits exactly into the capacity, mark it as the last frame
156        if max_data_len == remaining_capacity {
157            self.is_last_frame = true;
158            return Ok(max_data_len);
159        }
160
161        self.is_last_frame = false;
162
163        // Compute the maximum length prefix size we would need
164        let len_prefix_size = VarInt::try_from(max_data_len)
165            .map_err(|_| FitError)?
166            .encoding_size();
167
168        // Subtract the maximum length prefix size from the remaining capacity
169        //
170        // NOTE: It's possible that this result isn't completely optimal in every case. However,
171        //       instead of spending extra cycles fitting a couple of bytes into the frame, it's
172        //       good enough in most cases.
173        let prefixed_data_len = remaining_capacity
174            .checked_sub(len_prefix_size)
175            .ok_or(FitError)?;
176
177        let data_len = prefixed_data_len.min(data_len);
178
179        Ok(data_len)
180    }
181}
182
183decoder_parameterized_value!(
184    impl<'a, Data> Stream<Data> {
185        fn decode(tag: Tag, buffer: Buffer) -> Result<Self> {
186            let has_offset = tag & OFF_BIT == OFF_BIT;
187            let is_last_frame = tag & LEN_BIT != LEN_BIT;
188            let is_fin = tag & FIN_BIT == FIN_BIT;
189
190            let (stream_id, buffer) = buffer.decode()?;
191
192            let (offset, buffer) = if has_offset {
193                buffer.decode()?
194            } else {
195                (Default::default(), buffer)
196            };
197
198            let (data, buffer) = if !is_last_frame {
199                let (data, buffer) = buffer.decode_with_len_prefix::<VarInt, Data>()?;
200                (data, buffer)
201            } else {
202                let len = buffer.len();
203                let (data, buffer) = buffer.decode_slice(len)?;
204                let (data, remaining) = data.decode()?;
205                remaining.ensure_empty()?;
206                (data, buffer)
207            };
208
209            let frame = Stream {
210                stream_id,
211                offset,
212                is_last_frame,
213                is_fin,
214                data,
215            };
216
217            Ok((frame, buffer))
218        }
219    }
220);
221
222impl<Data: EncoderValue> EncoderValue for Stream<Data> {
223    #[inline]
224    fn encode<E: Encoder>(&self, buffer: &mut E) {
225        buffer.encode(&self.tag());
226        buffer.encode(&self.stream_id);
227
228        if *self.offset != 0 {
229            buffer.encode(&self.offset);
230        }
231
232        if self.is_last_frame {
233            buffer.encode(&self.data);
234        } else {
235            buffer.encode_with_len_prefix::<VarInt, _>(&self.data);
236        }
237    }
238
239    /// We hand optimize this encoding size so we can quickly estimate
240    /// how large a STREAM frame will be
241    #[inline]
242    fn encoding_size_for_encoder<E: Encoder>(&self, encoder: &E) -> usize {
243        let mut len = 0;
244        len += size_of::<Tag>();
245        len += self.stream_id.encoding_size();
246
247        if *self.offset != 0 {
248            len += self.offset.encoding_size();
249        }
250
251        let data_len = self.data.encoding_size_for_encoder(encoder);
252        len += data_len;
253
254        // include the len prefix
255        if !self.is_last_frame {
256            len += VarInt::try_from(data_len).unwrap().encoding_size();
257        }
258
259        // make sure the encoding size matches what we would actually encode
260        if cfg!(debug_assertions) {
261            use s2n_codec::EncoderLenEstimator;
262
263            let mut estimator = EncoderLenEstimator::new(encoder.remaining_capacity());
264            self.encode(&mut estimator);
265            assert_eq!(estimator.len(), len);
266        }
267
268        len
269    }
270}
271
272impl<'a> From<Stream<DecoderBuffer<'a>>> for StreamRef<'a> {
273    #[inline]
274    fn from(s: Stream<DecoderBuffer<'a>>) -> Self {
275        s.map_data(|data| data.into_less_safe_slice())
276    }
277}
278
279impl<'a> From<Stream<DecoderBufferMut<'a>>> for StreamRef<'a> {
280    #[inline]
281    fn from(s: Stream<DecoderBufferMut<'a>>) -> Self {
282        s.map_data(|data| &*data.into_less_safe_slice())
283    }
284}
285
286impl<'a> From<Stream<DecoderBufferMut<'a>>> for StreamMut<'a> {
287    #[inline]
288    fn from(s: Stream<DecoderBufferMut<'a>>) -> Self {
289        s.map_data(|data| data.into_less_safe_slice())
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::frame::Padding;
297    use bolero::check;
298    use core::convert::TryInto;
299
300    fn model(stream_id: VarInt, offset: VarInt, length: VarInt, capacity: usize) {
301        let length = if let Ok(length) = VarInt::try_into(length) {
302            length
303        } else {
304            // if the length cannot be represented by `usize` then bail
305            return;
306        };
307
308        let mut frame = Stream {
309            stream_id,
310            offset,
311            is_last_frame: false,
312            is_fin: false,
313            data: Padding { length },
314        };
315
316        if let Ok(new_length) = frame.try_fit(capacity) {
317            frame.data = Padding { length: new_length };
318
319            // we should never exceed the capacity
320            assert!(
321                frame.encoding_size() <= capacity,
322                "the encoding_size should not exceed capacity {frame:#?}"
323            );
324
325            if new_length < length {
326                let mut min = capacity;
327
328                // allow the payload to be smaller by the encoding size of the length prefix
329                if !frame.is_last_frame {
330                    min -= VarInt::try_from(new_length).unwrap().encoding_size();
331                }
332
333                // the payload was trimmed so we should be at capacity
334                let max = capacity;
335
336                assert!(
337                    (min..=max).contains(&frame.encoding_size()),
338                    "encoding_size ({}) should match capacity ({capacity}) {frame:#?}",
339                    frame.encoding_size(),
340                );
341            }
342
343            if frame.is_last_frame {
344                // the `is_last_frame` should _only_ be set when the encoding size == capacity
345                assert_eq!(
346                    frame.encoding_size(),
347                    capacity,
348                    "should only be the last frame if == capacity {frame:#?}"
349                );
350            }
351        } else {
352            assert!(
353                frame.encoding_size() > capacity,
354                "rejection should only occur when encoding size > capacity {frame:#?}"
355            );
356        }
357    }
358
359    #[test]
360    #[cfg_attr(kani, kani::proof, kani::unwind(1), kani::solver(kissat))]
361    fn try_fit_test() {
362        check!()
363            .with_type()
364            .cloned()
365            .for_each(|(stream_id, offset, length, capacity)| {
366                model(stream_id, offset, length, capacity);
367            });
368    }
369}