s2n_quic_core/frame/
stream.rs1use crate::{
5 frame::{FitError, Tag},
6 varint::VarInt,
7};
8use core::{convert::TryFrom, mem::size_of};
9use s2n_codec::{
10 decoder_parameterized_value, DecoderBuffer, DecoderBufferMut, Encoder, EncoderValue,
11};
12
13macro_rules! stream_tag {
19 () => {
20 0x08u8..=0x0fu8
21 };
22}
23
24const STREAM_TAG: u8 = 0x08;
25
26const OFF_BIT: u8 = 0x04;
35
36const LEN_BIT: u8 = 0x02;
43
44const FIN_BIT: u8 = 0x01;
50
51#[derive(Debug, PartialEq, Eq)]
79pub struct Stream<Data> {
80 pub stream_id: VarInt,
82
83 pub offset: VarInt,
86
87 pub is_last_frame: bool,
89
90 pub is_fin: bool,
92
93 pub data: Data,
95}
96
97pub type StreamRef<'a> = Stream<&'a [u8]>;
98pub type StreamMut<'a> = Stream<&'a mut [u8]>;
99
100impl<Data> Stream<Data> {
101 #[inline]
102 pub fn tag(&self) -> u8 {
103 let mut tag: u8 = STREAM_TAG;
104
105 if *self.offset != 0 {
106 tag |= OFF_BIT;
107 }
108
109 if !self.is_last_frame {
110 tag |= LEN_BIT;
111 }
112
113 if self.is_fin {
114 tag |= FIN_BIT;
115 }
116
117 tag
118 }
119
120 #[inline]
122 pub fn map_data<F: FnOnce(Data) -> Out, Out>(self, map: F) -> Stream<Out> {
123 Stream {
124 stream_id: self.stream_id,
125 offset: self.offset,
126 is_last_frame: self.is_last_frame,
127 is_fin: self.is_fin,
128 data: map(self.data),
129 }
130 }
131}
132
133impl<Data: EncoderValue> Stream<Data> {
134 #[inline]
141 pub fn try_fit(&mut self, capacity: usize) -> Result<usize, FitError> {
142 let mut fixed_len = 0;
143 fixed_len += size_of::<Tag>();
144 fixed_len += self.stream_id.encoding_size();
145
146 if self.offset != 0u64 {
147 fixed_len += self.offset.encoding_size();
148 }
149
150 let remaining_capacity = capacity.checked_sub(fixed_len).ok_or(FitError)?;
151
152 let data_len = self.data.encoding_size();
153 let max_data_len = remaining_capacity.min(data_len);
154
155 if max_data_len == remaining_capacity {
157 self.is_last_frame = true;
158 return Ok(max_data_len);
159 }
160
161 self.is_last_frame = false;
162
163 let len_prefix_size = VarInt::try_from(max_data_len)
165 .map_err(|_| FitError)?
166 .encoding_size();
167
168 let prefixed_data_len = remaining_capacity
174 .checked_sub(len_prefix_size)
175 .ok_or(FitError)?;
176
177 let data_len = prefixed_data_len.min(data_len);
178
179 Ok(data_len)
180 }
181}
182
183decoder_parameterized_value!(
184 impl<'a, Data> Stream<Data> {
185 fn decode(tag: Tag, buffer: Buffer) -> Result<Self> {
186 let has_offset = tag & OFF_BIT == OFF_BIT;
187 let is_last_frame = tag & LEN_BIT != LEN_BIT;
188 let is_fin = tag & FIN_BIT == FIN_BIT;
189
190 let (stream_id, buffer) = buffer.decode()?;
191
192 let (offset, buffer) = if has_offset {
193 buffer.decode()?
194 } else {
195 (Default::default(), buffer)
196 };
197
198 let (data, buffer) = if !is_last_frame {
199 let (data, buffer) = buffer.decode_with_len_prefix::<VarInt, Data>()?;
200 (data, buffer)
201 } else {
202 let len = buffer.len();
203 let (data, buffer) = buffer.decode_slice(len)?;
204 let (data, remaining) = data.decode()?;
205 remaining.ensure_empty()?;
206 (data, buffer)
207 };
208
209 let frame = Stream {
210 stream_id,
211 offset,
212 is_last_frame,
213 is_fin,
214 data,
215 };
216
217 Ok((frame, buffer))
218 }
219 }
220);
221
222impl<Data: EncoderValue> EncoderValue for Stream<Data> {
223 #[inline]
224 fn encode<E: Encoder>(&self, buffer: &mut E) {
225 buffer.encode(&self.tag());
226 buffer.encode(&self.stream_id);
227
228 if *self.offset != 0 {
229 buffer.encode(&self.offset);
230 }
231
232 if self.is_last_frame {
233 buffer.encode(&self.data);
234 } else {
235 buffer.encode_with_len_prefix::<VarInt, _>(&self.data);
236 }
237 }
238
239 #[inline]
242 fn encoding_size_for_encoder<E: Encoder>(&self, encoder: &E) -> usize {
243 let mut len = 0;
244 len += size_of::<Tag>();
245 len += self.stream_id.encoding_size();
246
247 if *self.offset != 0 {
248 len += self.offset.encoding_size();
249 }
250
251 let data_len = self.data.encoding_size_for_encoder(encoder);
252 len += data_len;
253
254 if !self.is_last_frame {
256 len += VarInt::try_from(data_len).unwrap().encoding_size();
257 }
258
259 if cfg!(debug_assertions) {
261 use s2n_codec::EncoderLenEstimator;
262
263 let mut estimator = EncoderLenEstimator::new(encoder.remaining_capacity());
264 self.encode(&mut estimator);
265 assert_eq!(estimator.len(), len);
266 }
267
268 len
269 }
270}
271
272impl<'a> From<Stream<DecoderBuffer<'a>>> for StreamRef<'a> {
273 #[inline]
274 fn from(s: Stream<DecoderBuffer<'a>>) -> Self {
275 s.map_data(|data| data.into_less_safe_slice())
276 }
277}
278
279impl<'a> From<Stream<DecoderBufferMut<'a>>> for StreamRef<'a> {
280 #[inline]
281 fn from(s: Stream<DecoderBufferMut<'a>>) -> Self {
282 s.map_data(|data| &*data.into_less_safe_slice())
283 }
284}
285
286impl<'a> From<Stream<DecoderBufferMut<'a>>> for StreamMut<'a> {
287 #[inline]
288 fn from(s: Stream<DecoderBufferMut<'a>>) -> Self {
289 s.map_data(|data| data.into_less_safe_slice())
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use crate::frame::Padding;
297 use bolero::check;
298 use core::convert::TryInto;
299
300 fn model(stream_id: VarInt, offset: VarInt, length: VarInt, capacity: usize) {
301 let length = if let Ok(length) = VarInt::try_into(length) {
302 length
303 } else {
304 return;
306 };
307
308 let mut frame = Stream {
309 stream_id,
310 offset,
311 is_last_frame: false,
312 is_fin: false,
313 data: Padding { length },
314 };
315
316 if let Ok(new_length) = frame.try_fit(capacity) {
317 frame.data = Padding { length: new_length };
318
319 assert!(
321 frame.encoding_size() <= capacity,
322 "the encoding_size should not exceed capacity {frame:#?}"
323 );
324
325 if new_length < length {
326 let mut min = capacity;
327
328 if !frame.is_last_frame {
330 min -= VarInt::try_from(new_length).unwrap().encoding_size();
331 }
332
333 let max = capacity;
335
336 assert!(
337 (min..=max).contains(&frame.encoding_size()),
338 "encoding_size ({}) should match capacity ({capacity}) {frame:#?}",
339 frame.encoding_size(),
340 );
341 }
342
343 if frame.is_last_frame {
344 assert_eq!(
346 frame.encoding_size(),
347 capacity,
348 "should only be the last frame if == capacity {frame:#?}"
349 );
350 }
351 } else {
352 assert!(
353 frame.encoding_size() > capacity,
354 "rejection should only occur when encoding size > capacity {frame:#?}"
355 );
356 }
357 }
358
359 #[test]
360 #[cfg_attr(kani, kani::proof, kani::unwind(1), kani::solver(kissat))]
361 fn try_fit_test() {
362 check!()
363 .with_type()
364 .cloned()
365 .for_each(|(stream_id, offset, length, capacity)| {
366 model(stream_id, offset, length, capacity);
367 });
368 }
369}