use bytes::{Buf, BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
pub const CONTROL_FRAME_LENGTH_MAX: usize = 512;
pub const FRAME_LENGTH_MAX: usize = 128 * 1024;
pub const FRAMESTREAMS_CONTROL_ACCEPT: u32 = 0x01;
pub const FRAMESTREAMS_CONTROL_START: u32 = 0x02;
pub const FRAMESTREAMS_CONTROL_STOP: u32 = 0x03;
pub const FRAMESTREAMS_CONTROL_READY: u32 = 0x04;
pub const FRAMESTREAMS_CONTROL_FINISH: u32 = 0x05;
pub const FRAMESTREAMS_CONTROL_FIELD_CONTENT_TYPE: u32 = 0x01;
pub const FRAMESTREAMS_ESCAPE_SEQUENCE: u32 = 0x00;
#[derive(Debug)]
pub enum Frame {
ControlReady(BytesMut),
ControlAccept(BytesMut),
ControlStart(BytesMut),
ControlStop,
ControlFinish,
ControlUnknown(BytesMut),
Data(BytesMut),
}
pub struct FrameStreamsCodec {}
impl Encoder<Frame> for FrameStreamsCodec {
type Error = std::io::Error;
fn encode(&mut self, frame: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
match frame {
Frame::ControlReady(payload) => {
dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
dst.put_u32(4);
dst.put_u32(FRAMESTREAMS_CONTROL_READY);
dst.put(payload);
}
Frame::ControlAccept(payload) => {
dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
dst.put_u32(4 + payload.len() as u32);
dst.put_u32(FRAMESTREAMS_CONTROL_ACCEPT);
dst.put(payload);
}
Frame::ControlStart(payload) => {
dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
dst.put_u32(4 + payload.len() as u32);
dst.put_u32(FRAMESTREAMS_CONTROL_START);
dst.put(payload);
}
Frame::ControlStop => {
dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
dst.put_u32(4);
dst.put_u32(FRAMESTREAMS_CONTROL_STOP);
}
Frame::ControlFinish => {
dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
dst.put_u32(4);
dst.put_u32(FRAMESTREAMS_CONTROL_FINISH);
}
Frame::ControlUnknown(_) => todo!(),
Frame::Data(payload) => {
dst.put_u32(payload.len() as u32);
dst.put(payload);
}
}
Ok(())
}
}
impl Decoder for FrameStreamsCodec {
type Item = Frame;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
return Ok(None);
}
let mut len_frame_bytes = [0u8; 4];
len_frame_bytes.copy_from_slice(&src[..4]);
let len_frame = u32::from_be_bytes(len_frame_bytes) as usize;
if len_frame > FRAME_LENGTH_MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Frame of length {} is too large for this implementation",
len_frame
),
));
}
if len_frame == FRAMESTREAMS_ESCAPE_SEQUENCE as usize {
if src.len() < 4 + 4 {
return Ok(None);
}
let mut len_control_bytes = [0u8; 4];
len_control_bytes.copy_from_slice(&src[4..8]);
let len_control = u32::from_be_bytes(len_control_bytes) as usize;
if len_control < 4 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Control frame of length {} is too small", len_control),
));
}
if len_control > CONTROL_FRAME_LENGTH_MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Control frame of length {} is too large", len_control),
));
}
if src.len() < 4 + 4 + len_control {
return Ok(None);
}
src.advance(4 + 4);
let control_frame_type = src.get_u32();
let payload = src.split_to(len_control - 4);
Ok(Some(match control_frame_type {
FRAMESTREAMS_CONTROL_READY => Frame::ControlReady(payload),
FRAMESTREAMS_CONTROL_ACCEPT => Frame::ControlAccept(payload),
FRAMESTREAMS_CONTROL_START => Frame::ControlStart(payload),
FRAMESTREAMS_CONTROL_STOP => Frame::ControlStop,
FRAMESTREAMS_CONTROL_FINISH => Frame::ControlFinish,
_ => Frame::ControlUnknown(payload),
}))
} else {
if src.len() < 4 + len_frame {
return Ok(None);
}
src.advance(4);
let data = src.split_to(len_frame);
Ok(Some(Frame::Data(data)))
}
}
}
pub fn encode_content_type_payload(content_type: &[u8]) -> BytesMut {
let mut buf = BytesMut::with_capacity(4 + 4 + content_type.len());
buf.put_u32(FRAMESTREAMS_CONTROL_FIELD_CONTENT_TYPE);
buf.put_u32(content_type.len() as u32);
buf.put(content_type);
buf
}