use super::types::ObjectStatus;
use crate::error::CodecError;
use crate::types::read_bytes;
use crate::varint::VarInt;
use bytes::{Buf, BufMut};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum StreamType {
Datagram = 0x01,
DatagramStatus = 0x02,
Subgroup = 0x04,
Fetch = 0x05,
}
impl StreamType {
pub fn from_id(id: u64) -> Option<Self> {
match id {
0x01 => Some(StreamType::Datagram),
0x02 => Some(StreamType::DatagramStatus),
0x04 => Some(StreamType::Subgroup),
0x05 => Some(StreamType::Fetch),
_ => None,
}
}
}
fn read_extension_bytes(buf: &mut impl Buf, byte_len: u64) -> Result<Vec<u8>, CodecError> {
read_bytes(buf, byte_len as usize)
}
fn encode_extensions(extensions: &[u8], buf: &mut impl BufMut) {
buf.put_slice(extensions);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubgroupHeader {
pub track_alias: VarInt,
pub group_id: VarInt,
pub subgroup_id: VarInt,
pub publisher_priority: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObjectHeader {
pub object_id: VarInt,
pub extension_headers_length: VarInt,
pub extensions: Vec<u8>,
pub payload_length: VarInt,
pub object_status: ObjectStatus,
}
impl SubgroupHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.track_alias.encode(buf);
self.group_id.encode(buf);
self.subgroup_id.encode(buf);
buf.put_u8(self.publisher_priority);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let track_alias = VarInt::decode(buf)?;
let group_id = VarInt::decode(buf)?;
let subgroup_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
}
}
impl ObjectHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.object_id.encode(buf);
self.extension_headers_length.encode(buf);
encode_extensions(&self.extensions, buf);
self.payload_length.encode(buf);
if self.payload_length.into_inner() == 0 {
VarInt::from_usize(self.object_status as usize).encode(buf);
}
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let object_id = VarInt::decode(buf)?;
let extension_headers_length = VarInt::decode(buf)?;
let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
let payload_length = VarInt::decode(buf)?;
let object_status = if payload_length.into_inner() == 0 {
let status_val = VarInt::decode(buf)?.into_inner();
ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
} else {
ObjectStatus::Normal
};
Ok(Self { object_id, extension_headers_length, extensions, payload_length, object_status })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatagramHeader {
pub track_alias: VarInt,
pub group_id: VarInt,
pub object_id: VarInt,
pub publisher_priority: u8,
pub extension_headers_length: VarInt,
pub extensions: Vec<u8>,
}
impl DatagramHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.track_alias.encode(buf);
self.group_id.encode(buf);
self.object_id.encode(buf);
buf.put_u8(self.publisher_priority);
self.extension_headers_length.encode(buf);
encode_extensions(&self.extensions, buf);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let track_alias = VarInt::decode(buf)?;
let group_id = VarInt::decode(buf)?;
let object_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
let extension_headers_length = VarInt::decode(buf)?;
let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
Ok(Self {
track_alias,
group_id,
object_id,
publisher_priority,
extension_headers_length,
extensions,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatagramStatusHeader {
pub track_alias: VarInt,
pub group_id: VarInt,
pub object_id: VarInt,
pub publisher_priority: u8,
pub extension_headers_length: VarInt,
pub extensions: Vec<u8>,
pub object_status: ObjectStatus,
}
impl DatagramStatusHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.track_alias.encode(buf);
self.group_id.encode(buf);
self.object_id.encode(buf);
buf.put_u8(self.publisher_priority);
self.extension_headers_length.encode(buf);
encode_extensions(&self.extensions, buf);
VarInt::from_usize(self.object_status as usize).encode(buf);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let track_alias = VarInt::decode(buf)?;
let group_id = VarInt::decode(buf)?;
let object_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
let extension_headers_length = VarInt::decode(buf)?;
let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
let status_val = VarInt::decode(buf)?.into_inner();
let object_status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
Ok(Self {
track_alias,
group_id,
object_id,
publisher_priority,
extension_headers_length,
extensions,
object_status,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchHeader {
pub subscribe_id: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchObjectHeader {
pub group_id: VarInt,
pub subgroup_id: VarInt,
pub object_id: VarInt,
pub publisher_priority: u8,
pub extension_headers_length: VarInt,
pub extensions: Vec<u8>,
pub object_status: ObjectStatus,
pub payload_length: VarInt,
}
impl FetchHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.subscribe_id.encode(buf);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let subscribe_id = VarInt::decode(buf)?;
Ok(Self { subscribe_id })
}
}
impl FetchObjectHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.group_id.encode(buf);
self.subgroup_id.encode(buf);
self.object_id.encode(buf);
buf.put_u8(self.publisher_priority);
self.extension_headers_length.encode(buf);
encode_extensions(&self.extensions, buf);
self.payload_length.encode(buf);
if self.payload_length.into_inner() == 0 {
VarInt::from_usize(self.object_status as usize).encode(buf);
}
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let group_id = VarInt::decode(buf)?;
let subgroup_id = VarInt::decode(buf)?;
let object_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
let extension_headers_length = VarInt::decode(buf)?;
let extensions = read_extension_bytes(buf, extension_headers_length.into_inner())?;
let payload_length = VarInt::decode(buf)?;
let object_status = if payload_length.into_inner() == 0 {
let status_val = VarInt::decode(buf)?.into_inner();
ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
} else {
ObjectStatus::Normal
};
Ok(Self {
group_id,
subgroup_id,
object_id,
publisher_priority,
extension_headers_length,
extensions,
object_status,
payload_length,
})
}
}