use super::types::ObjectStatus;
use crate::error::CodecError;
use crate::types::read_bytes;
use crate::varint::VarInt;
use bytes::{Buf, BufMut};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum StreamType {
Datagram = 0x01,
DatagramStatus = 0x02,
Subgroup = 0x04,
Fetch = 0x05,
}
impl StreamType {
pub fn from_id(id: u64) -> Option<Self> {
match id {
0x01 => Some(StreamType::Datagram),
0x02 => Some(StreamType::DatagramStatus),
0x04 => Some(StreamType::Subgroup),
0x05 => Some(StreamType::Fetch),
_ => None,
}
}
}
fn skip_extensions(buf: &mut impl Buf, count: u64) -> Result<Vec<u8>, CodecError> {
let mut raw = Vec::new();
for _ in 0..count {
let ext_type = VarInt::decode(buf)?;
ext_type.encode(&mut raw);
if ext_type.into_inner() % 2 == 0 {
let val = VarInt::decode(buf)?;
val.encode(&mut raw);
} else {
let len = VarInt::decode(buf)?.into_inner() as usize;
VarInt::from_usize(len).encode(&mut raw);
let bytes = read_bytes(buf, len)?;
raw.extend_from_slice(&bytes);
}
}
Ok(raw)
}
fn encode_extensions(extensions: &[u8], buf: &mut impl BufMut) {
buf.put_slice(extensions);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubgroupHeader {
pub track_alias: VarInt,
pub group_id: VarInt,
pub subgroup_id: VarInt,
pub publisher_priority: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObjectHeader {
pub object_id: VarInt,
pub extension_count: VarInt,
pub extensions: Vec<u8>,
pub payload_length: VarInt,
pub object_status: ObjectStatus,
}
impl SubgroupHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.track_alias.encode(buf);
self.group_id.encode(buf);
self.subgroup_id.encode(buf);
buf.put_u8(self.publisher_priority);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let track_alias = VarInt::decode(buf)?;
let group_id = VarInt::decode(buf)?;
let subgroup_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
Ok(Self { track_alias, group_id, subgroup_id, publisher_priority })
}
}
impl ObjectHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.object_id.encode(buf);
self.extension_count.encode(buf);
encode_extensions(&self.extensions, buf);
self.payload_length.encode(buf);
if self.payload_length.into_inner() == 0 {
VarInt::from_usize(self.object_status as usize).encode(buf);
}
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let object_id = VarInt::decode(buf)?;
let extension_count = VarInt::decode(buf)?;
let extensions = skip_extensions(buf, extension_count.into_inner())?;
let payload_length = VarInt::decode(buf)?;
let object_status = if payload_length.into_inner() == 0 {
let status_val = VarInt::decode(buf)?.into_inner();
ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
} else {
ObjectStatus::Normal
};
Ok(Self { object_id, extension_count, extensions, payload_length, object_status })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatagramHeader {
pub track_alias: VarInt,
pub group_id: VarInt,
pub object_id: VarInt,
pub publisher_priority: u8,
pub extension_count: VarInt,
pub extensions: Vec<u8>,
pub object_status: ObjectStatus,
pub payload_length: VarInt,
}
impl DatagramHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.track_alias.encode(buf);
self.group_id.encode(buf);
self.object_id.encode(buf);
buf.put_u8(self.publisher_priority);
self.extension_count.encode(buf);
encode_extensions(&self.extensions, buf);
self.payload_length.encode(buf);
if self.payload_length.into_inner() == 0 {
VarInt::from_usize(self.object_status as usize).encode(buf);
}
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let track_alias = VarInt::decode(buf)?;
let group_id = VarInt::decode(buf)?;
let object_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
let extension_count = VarInt::decode(buf)?;
let extensions = skip_extensions(buf, extension_count.into_inner())?;
let payload_length = VarInt::decode(buf)?;
let object_status = if payload_length.into_inner() == 0 {
let status_val = VarInt::decode(buf)?.into_inner();
ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
} else {
ObjectStatus::Normal
};
Ok(Self {
track_alias,
group_id,
object_id,
publisher_priority,
extension_count,
extensions,
object_status,
payload_length,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatagramStatusHeader {
pub track_alias: VarInt,
pub group_id: VarInt,
pub object_id: VarInt,
pub publisher_priority: u8,
pub object_status: ObjectStatus,
}
impl DatagramStatusHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.track_alias.encode(buf);
self.group_id.encode(buf);
self.object_id.encode(buf);
buf.put_u8(self.publisher_priority);
VarInt::from_usize(self.object_status as usize).encode(buf);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let track_alias = VarInt::decode(buf)?;
let group_id = VarInt::decode(buf)?;
let object_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
let status_val = VarInt::decode(buf)?.into_inner();
let object_status = ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?;
Ok(Self { track_alias, group_id, object_id, publisher_priority, object_status })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchHeader {
pub subscribe_id: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchObjectHeader {
pub group_id: VarInt,
pub subgroup_id: VarInt,
pub object_id: VarInt,
pub publisher_priority: u8,
pub extension_count: VarInt,
pub extensions: Vec<u8>,
pub object_status: ObjectStatus,
pub payload_length: VarInt,
}
impl FetchHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.subscribe_id.encode(buf);
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let subscribe_id = VarInt::decode(buf)?;
Ok(Self { subscribe_id })
}
}
impl FetchObjectHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
self.group_id.encode(buf);
self.subgroup_id.encode(buf);
self.object_id.encode(buf);
buf.put_u8(self.publisher_priority);
self.extension_count.encode(buf);
encode_extensions(&self.extensions, buf);
self.payload_length.encode(buf);
if self.payload_length.into_inner() == 0 {
VarInt::from_usize(self.object_status as usize).encode(buf);
}
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let group_id = VarInt::decode(buf)?;
let subgroup_id = VarInt::decode(buf)?;
let object_id = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let publisher_priority = buf.get_u8();
let extension_count = VarInt::decode(buf)?;
let extensions = skip_extensions(buf, extension_count.into_inner())?;
let payload_length = VarInt::decode(buf)?;
let object_status = if payload_length.into_inner() == 0 {
let status_val = VarInt::decode(buf)?.into_inner();
ObjectStatus::from_u64(status_val).ok_or(CodecError::InvalidField)?
} else {
ObjectStatus::Normal
};
Ok(Self {
group_id,
subgroup_id,
object_id,
publisher_priority,
extension_count,
extensions,
object_status,
payload_length,
})
}
}