use std::ops::Range;
use super::GetFrameType;
use crate::{
frame::EncodeSize,
sid::{StreamId, WriteStreamId, be_streamid},
util::{ContinuousData, WriteData},
varint::{VARINT_MAX, VarInt, WriteVarInt, be_varint},
};
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum Offset {
Zero,
NonZero,
}
impl From<Offset> for u8 {
fn from(offset: Offset) -> u8 {
match offset {
Offset::Zero => 0,
Offset::NonZero => 0x04,
}
}
}
impl From<u8> for Offset {
fn from(value: u8) -> Self {
match value & 0x04 {
0 => Offset::Zero,
_ => Offset::NonZero,
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum Len {
Sized,
Omit,
}
impl From<Len> for u8 {
fn from(length: Len) -> u8 {
match length {
Len::Sized => 0x02,
Len::Omit => 0,
}
}
}
impl From<u8> for Len {
fn from(value: u8) -> Self {
match value & 0x02 {
0 => Len::Omit,
_ => Len::Sized,
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum Fin {
Yes,
No,
}
impl From<Fin> for u8 {
fn from(fin: Fin) -> u8 {
match fin {
Fin::Yes => 0x01,
Fin::No => 0,
}
}
}
impl From<u8> for Fin {
fn from(value: u8) -> Self {
match value & 0x01 {
0 => Fin::No,
_ => Fin::Yes,
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct Flags(pub Offset, pub Len, pub Fin);
impl Flags {
pub fn has_offset(&self) -> bool {
self.0 == Offset::NonZero
}
pub fn with_offset(&mut self) {
self.0 = Offset::NonZero;
}
pub fn has_length(&self) -> bool {
self.1 == Len::Sized
}
pub fn with_length(&mut self) {
self.1 = Len::Sized;
}
pub fn is_fin(&self) -> bool {
self.2 == Fin::Yes
}
pub fn fin(&mut self) {
self.2 = Fin::Yes;
}
}
impl From<Flags> for u8 {
fn from(flags: Flags) -> u8 {
let offset_bit: u8 = flags.0.into();
let length_bit: u8 = flags.1.into();
let fin_bit: u8 = flags.2.into();
offset_bit | length_bit | fin_bit
}
}
impl From<u8> for Flags {
fn from(value: u8) -> Self {
Flags(Offset::from(value), Len::from(value), Fin::from(value))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamFrame {
id: StreamId,
offset: VarInt,
length: usize,
flag: Flags,
}
pub const STREAM_FRAME_MAX_ENCODING_SIZE: usize = 1 + 8 + 8 + 8;
impl GetFrameType for StreamFrame {
fn frame_type(&self) -> super::FrameType {
super::FrameType::Stream(self.flag)
}
}
impl super::EncodeSize for StreamFrame {
fn max_encoding_size(&self) -> usize {
STREAM_FRAME_MAX_ENCODING_SIZE
}
fn encoding_size(&self) -> usize {
1 + self.id.encoding_size()
+ if self.offset.into_inner() != 0 {
self.offset.encoding_size()
} else {
0
}
+ if self.flag.has_length() {
VarInt::from_u64(self.length as u64)
.expect("msg length must be less than 2^62")
.encoding_size()
} else {
0
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EncodingStrategy {
carry_length: bool,
padding: usize,
}
impl EncodingStrategy {
pub fn carry_length(&self) -> bool {
self.carry_length
}
pub fn padding(&self) -> usize {
self.padding
}
}
impl StreamFrame {
pub fn new(id: StreamId, offset: u64, length: usize) -> Self {
assert!(offset <= VARINT_MAX);
Self {
id,
offset: VarInt::from_u64(offset)
.expect("offset of stream frame must be less than 2^62"),
length,
flag: Flags(
if offset != 0 {
Offset::NonZero
} else {
Offset::Zero
},
Len::Omit,
Fin::No,
),
}
}
pub fn stream_id(&self) -> StreamId {
self.id
}
pub fn is_fin(&self) -> bool {
self.flag.is_fin()
}
pub fn offset(&self) -> u64 {
self.offset.into_inner()
}
pub fn len(&self) -> usize {
self.length
}
pub fn is_empty(&self) -> bool {
self.length == 0
}
pub fn range(&self) -> Range<u64> {
self.offset.into_inner()..self.offset.into_inner() + self.length as u64
}
pub fn set_eos_flag(&mut self, is_eos: bool) {
if is_eos {
self.flag.2 = Fin::Yes;
} else {
self.flag.2 = Fin::No;
}
}
pub fn set_len_flag(&mut self, with_len: bool) {
if with_len {
self.flag.1 = Len::Sized;
} else {
self.flag.1 = Len::Omit;
}
}
pub fn encoding_strategy(&self, capacity: usize) -> EncodingStrategy {
debug_assert!(!self.flag.has_length());
let encoding_size_without_length = self.encoding_size() + self.length;
assert!(encoding_size_without_length <= capacity);
let len_encoding_size = VarInt::try_from(self.length)
.expect("length of stream frame must be less than 2^62")
.encoding_size();
let remaining = capacity - encoding_size_without_length;
if remaining >= len_encoding_size {
let remaining = remaining - len_encoding_size;
if remaining < STREAM_FRAME_MAX_ENCODING_SIZE {
EncodingStrategy {
carry_length: true,
padding: remaining,
}
} else {
EncodingStrategy {
carry_length: true,
padding: 0,
}
}
} else {
EncodingStrategy {
carry_length: false,
padding: remaining,
}
}
}
pub fn estimate_max_capacity(capacity: usize, sid: StreamId, offset: u64) -> Option<usize> {
assert!(offset <= VARINT_MAX);
let mut least = 1 + sid.encoding_size();
if offset != 0 {
least += VarInt::from_u64(offset).unwrap().encoding_size();
}
if capacity <= least {
None
} else {
Some(capacity - least)
}
}
}
pub fn stream_frame_with_flag(flag: Flags) -> impl Fn(&[u8]) -> nom::IResult<&[u8], StreamFrame> {
move |input| {
let (remain, id) = be_streamid(input)?;
let (remain, offset) = if flag.has_offset() {
be_varint(remain)?
} else {
(remain, VarInt::default())
};
let (remain, length) = if flag.has_length() {
let (remain, length) = be_varint(remain)?;
(remain, length.into_inner() as usize)
} else {
(remain, remain.len())
};
if offset.into_inner() + length as u64 > VARINT_MAX {
return Err(nom::Err::Error(nom::error::make_error(
input,
nom::error::ErrorKind::TooLarge,
)));
}
Ok((
remain,
StreamFrame {
id,
offset,
length,
flag,
},
))
}
}
impl<T, D> super::io::WriteDataFrame<StreamFrame, D> for T
where
T: bytes::BufMut + WriteData<D>,
D: ContinuousData,
{
fn put_data_frame(&mut self, frame: &StreamFrame, data: &D) {
let stream_type = frame.frame_type();
self.put_varint(&stream_type.into());
self.put_streamid(&frame.id);
if frame.offset.into_inner() != 0 {
self.put_varint(&frame.offset);
}
if frame.flag.has_length() {
self.put_varint(&VarInt::from_u32(frame.length as u32));
}
self.put_data(data);
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use nom::{Parser, combinator::flat_map};
use super::*;
use crate::{
frame::{EncodeSize, FrameType, GetFrameType, io::WriteDataFrame},
varint::{VarInt, be_varint},
};
#[test]
fn test_stream_frame() {
let stream_frame = StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0x1234),
length: 11,
flag: Flags(Offset::NonZero, Len::Sized, Fin::No),
};
assert_eq!(
stream_frame.frame_type(),
FrameType::Stream(Flags(Offset::NonZero, Len::Sized, Fin::No))
);
assert_eq!(stream_frame.max_encoding_size(), 1 + 8 + 8 + 8);
assert_eq!(stream_frame.encoding_size(), 1 + 2 + 2 + 1);
}
#[test]
fn test_read_stream_frame() {
let raw = Bytes::from_static(&[
0x0e, 0x52, 0x34, 0x52, 0x34, 0x0b, b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o',
b'r', b'l', b'd', 0,
]);
let input = raw.as_ref();
let (input, frame) = flat_map(be_varint, |frame_type| {
let stream_frame_type: VarInt =
FrameType::Stream(Flags(Offset::NonZero, Len::Sized, Fin::No)).into();
assert_eq!(frame_type, stream_frame_type);
stream_frame_with_flag(Flags(Offset::NonZero, Len::Sized, Fin::No))
})
.parse(input)
.unwrap();
assert_eq!(
input,
&[
b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o', b'r', b'l', b'd', 0,
][..]
);
assert_eq!(
frame,
StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0x1234),
length: 11,
flag: Flags(Offset::NonZero, Len::Sized, Fin::No),
}
);
}
#[test]
fn test_read_last_stream_frame() {
let raw = Bytes::from_static(&[
0x0c, 0x52, 0x34, 0x52, 0x34, b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o', b'r',
b'l', b'd',
]);
let input = raw.as_ref();
let (input, frame) = flat_map(be_varint, |frame_type| {
let stream_frame_type: VarInt =
FrameType::Stream(Flags(Offset::NonZero, Len::Omit, Fin::No)).into();
assert_eq!(frame_type, stream_frame_type);
stream_frame_with_flag(Flags(Offset::NonZero, Len::Omit, Fin::No))
})
.parse(input)
.unwrap();
assert_eq!(
input,
&[
b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o', b'r', b'l', b'd',
][..]
);
assert_eq!(
frame,
StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0x1234),
length: 11,
flag: Flags(Offset::NonZero, Len::Omit, Fin::No),
}
);
}
#[test]
fn test_write_initial_stream_frame() {
let mut buf = Vec::new();
let frame = StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0),
length: 11,
flag: Flags(Offset::Zero, Len::Sized, Fin::Yes),
};
buf.put_data_frame(&frame, b"hello world");
assert_eq!(
buf,
vec![
0xb, 0x52, 0x34, 0x0b, b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o', b'r', b'l',
b'd'
]
);
}
#[test]
fn test_write_last_stream_frame() {
let mut buf = Vec::new();
let frame = StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0),
length: 11,
flag: Flags(Offset::Zero, Len::Omit, Fin::Yes),
};
buf.put_data_frame(&frame, b"hello world");
assert_eq!(
buf,
vec![
0x9, 0x52, 0x34, b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o', b'r', b'l', b'd'
]
);
}
#[test]
fn test_write_eos_frame() {
let mut buf = Vec::new();
let frame = StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0x1234),
length: 11,
flag: Flags(Offset::NonZero, Len::Sized, Fin::Yes),
};
buf.put_data_frame(&frame, b"hello world");
assert_eq!(
buf,
vec![
0x0f, 0x52, 0x34, 0x52, 0x34, 0x0b, b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o',
b'r', b'l', b'd'
]
);
}
#[test]
fn test_write_unfinished_stream_frame() {
let mut buf = Vec::new();
let frame = StreamFrame {
id: VarInt::from_u32(0x1234).into(),
offset: VarInt::from_u32(0x1234),
length: 11,
flag: Flags(Offset::NonZero, Len::Sized, Fin::No),
};
buf.put_data_frame(&frame, b"hello world");
assert_eq!(
buf,
vec![
0x0e, 0x52, 0x34, 0x52, 0x34, 0x0b, b'h', b'e', b'l', b'l', b'o', b' ', b'w', b'o',
b'r', b'l', b'd'
]
);
}
}