use crate::error::{
CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_REASON_PHRASE_LENGTH,
};
use crate::kvp::KeyValuePair;
use crate::types::read_bytes;
use crate::types::*;
use crate::varint::VarInt;
use bytes::{Buf, BufMut};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum MessageType {
SubscribeUpdate = 0x02,
Subscribe = 0x03,
SubscribeOk = 0x04,
SubscribeError = 0x05,
Announce = 0x06,
AnnounceOk = 0x07,
AnnounceError = 0x08,
Unannounce = 0x09,
Unsubscribe = 0x0A,
SubscribeDone = 0x0B,
AnnounceCancel = 0x0C,
TrackStatusRequest = 0x0D,
TrackStatus = 0x0E,
GoAway = 0x10,
SubscribeAnnounces = 0x11,
SubscribeAnnouncesOk = 0x12,
SubscribeAnnouncesError = 0x13,
UnsubscribeAnnounces = 0x14,
MaxSubscribeId = 0x15,
Fetch = 0x16,
FetchCancel = 0x17,
FetchOk = 0x18,
FetchError = 0x19,
ClientSetup = 0x40,
ServerSetup = 0x41,
}
impl MessageType {
pub fn from_id(id: u64) -> Option<Self> {
match id {
0x02 => Some(MessageType::SubscribeUpdate),
0x03 => Some(MessageType::Subscribe),
0x04 => Some(MessageType::SubscribeOk),
0x05 => Some(MessageType::SubscribeError),
0x06 => Some(MessageType::Announce),
0x07 => Some(MessageType::AnnounceOk),
0x08 => Some(MessageType::AnnounceError),
0x09 => Some(MessageType::Unannounce),
0x0A => Some(MessageType::Unsubscribe),
0x0B => Some(MessageType::SubscribeDone),
0x0C => Some(MessageType::AnnounceCancel),
0x0D => Some(MessageType::TrackStatusRequest),
0x0E => Some(MessageType::TrackStatus),
0x10 => Some(MessageType::GoAway),
0x11 => Some(MessageType::SubscribeAnnounces),
0x12 => Some(MessageType::SubscribeAnnouncesOk),
0x13 => Some(MessageType::SubscribeAnnouncesError),
0x14 => Some(MessageType::UnsubscribeAnnounces),
0x15 => Some(MessageType::MaxSubscribeId),
0x16 => Some(MessageType::Fetch),
0x17 => Some(MessageType::FetchCancel),
0x18 => Some(MessageType::FetchOk),
0x19 => Some(MessageType::FetchError),
0x40 => Some(MessageType::ClientSetup),
0x41 => Some(MessageType::ServerSetup),
_ => None,
}
}
pub fn id(&self) -> u64 {
*self as u64
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientSetup {
pub supported_versions: Vec<VarInt>,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerSetup {
pub selected_version: VarInt,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GoAway {
pub new_session_uri: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MaxSubscribeId {
pub subscribe_id: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Subscribe {
pub subscribe_id: VarInt,
pub track_alias: VarInt,
pub track_namespace: TrackNamespace,
pub track_name: Vec<u8>,
pub subscriber_priority: u8,
pub group_order: GroupOrder,
pub filter_type: FilterType,
pub start_location: Option<Location>,
pub end_group: Option<VarInt>,
pub end_object: Option<VarInt>,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeOk {
pub subscribe_id: VarInt,
pub expires: VarInt,
pub group_order: GroupOrder,
pub content_exists: ContentExists,
pub largest_group_id: Option<VarInt>,
pub largest_object_id: Option<VarInt>,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeError {
pub subscribe_id: VarInt,
pub error_code: VarInt,
pub reason_phrase: Vec<u8>,
pub track_alias: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeUpdate {
pub subscribe_id: VarInt,
pub start_group: VarInt,
pub start_object: VarInt,
pub end_group: VarInt,
pub end_object: VarInt,
pub subscriber_priority: u8,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeDone {
pub subscribe_id: VarInt,
pub status_code: VarInt,
pub reason_phrase: Vec<u8>,
pub content_exists: ContentExists,
pub final_group: Option<VarInt>,
pub final_object: Option<VarInt>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Unsubscribe {
pub subscribe_id: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Announce {
pub track_namespace: TrackNamespace,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnnounceOk {
pub track_namespace: TrackNamespace,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnnounceError {
pub track_namespace: TrackNamespace,
pub error_code: VarInt,
pub reason_phrase: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnnounceCancel {
pub track_namespace: TrackNamespace,
pub error_code: VarInt,
pub reason_phrase: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Unannounce {
pub track_namespace: TrackNamespace,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeAnnounces {
pub track_namespace_prefix: TrackNamespace,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeAnnouncesOk {
pub track_namespace_prefix: TrackNamespace,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscribeAnnouncesError {
pub track_namespace_prefix: TrackNamespace,
pub error_code: VarInt,
pub reason_phrase: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnsubscribeAnnounces {
pub track_namespace_prefix: TrackNamespace,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TrackStatusRequest {
pub track_namespace: TrackNamespace,
pub track_name: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TrackStatus {
pub track_namespace: TrackNamespace,
pub track_name: Vec<u8>,
pub status_code: VarInt,
pub last_group_id: VarInt,
pub last_object_id: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Fetch {
pub subscribe_id: VarInt,
pub track_namespace: TrackNamespace,
pub track_name: Vec<u8>,
pub subscriber_priority: u8,
pub group_order: GroupOrder,
pub start_group: VarInt,
pub start_object: VarInt,
pub end_group: VarInt,
pub end_object: VarInt,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchOk {
pub subscribe_id: VarInt,
pub group_order: GroupOrder,
pub end_of_track: u8,
pub largest_group_id: Option<VarInt>,
pub largest_object_id: Option<VarInt>,
pub parameters: Vec<KeyValuePair>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchError {
pub subscribe_id: VarInt,
pub error_code: VarInt,
pub reason_phrase: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FetchCancel {
pub subscribe_id: VarInt,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ControlMessage {
ClientSetup(ClientSetup),
ServerSetup(ServerSetup),
GoAway(GoAway),
MaxSubscribeId(MaxSubscribeId),
Subscribe(Subscribe),
SubscribeOk(SubscribeOk),
SubscribeError(SubscribeError),
SubscribeUpdate(SubscribeUpdate),
SubscribeDone(SubscribeDone),
Unsubscribe(Unsubscribe),
Announce(Announce),
AnnounceOk(AnnounceOk),
AnnounceError(AnnounceError),
AnnounceCancel(AnnounceCancel),
Unannounce(Unannounce),
SubscribeAnnounces(SubscribeAnnounces),
SubscribeAnnouncesOk(SubscribeAnnouncesOk),
SubscribeAnnouncesError(SubscribeAnnouncesError),
UnsubscribeAnnounces(UnsubscribeAnnounces),
TrackStatusRequest(TrackStatusRequest),
TrackStatus(TrackStatus),
Fetch(Fetch),
FetchOk(FetchOk),
FetchError(FetchError),
FetchCancel(FetchCancel),
}
impl ControlMessage {
pub fn message_type(&self) -> MessageType {
match self {
ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
ControlMessage::GoAway(_) => MessageType::GoAway,
ControlMessage::MaxSubscribeId(_) => MessageType::MaxSubscribeId,
ControlMessage::Subscribe(_) => MessageType::Subscribe,
ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
ControlMessage::SubscribeDone(_) => MessageType::SubscribeDone,
ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
ControlMessage::Announce(_) => MessageType::Announce,
ControlMessage::AnnounceOk(_) => MessageType::AnnounceOk,
ControlMessage::AnnounceError(_) => MessageType::AnnounceError,
ControlMessage::AnnounceCancel(_) => MessageType::AnnounceCancel,
ControlMessage::Unannounce(_) => MessageType::Unannounce,
ControlMessage::SubscribeAnnounces(_) => MessageType::SubscribeAnnounces,
ControlMessage::SubscribeAnnouncesOk(_) => MessageType::SubscribeAnnouncesOk,
ControlMessage::SubscribeAnnouncesError(_) => MessageType::SubscribeAnnouncesError,
ControlMessage::UnsubscribeAnnounces(_) => MessageType::UnsubscribeAnnounces,
ControlMessage::TrackStatusRequest(_) => MessageType::TrackStatusRequest,
ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
ControlMessage::Fetch(_) => MessageType::Fetch,
ControlMessage::FetchOk(_) => MessageType::FetchOk,
ControlMessage::FetchError(_) => MessageType::FetchError,
ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
}
}
pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
let mut payload = Vec::with_capacity(256);
self.encode_payload(&mut payload)?;
if payload.len() > MAX_MESSAGE_LENGTH {
return Err(CodecError::MessageTooLong(payload.len()));
}
VarInt::from_usize(self.message_type().id() as usize).encode(buf);
VarInt::from_usize(payload.len()).encode(buf);
buf.put_slice(&payload);
Ok(())
}
pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
let type_id = VarInt::decode(buf)?.into_inner();
let msg_type =
MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
let payload_len = VarInt::decode(buf)?.into_inner() as usize;
if buf.remaining() < payload_len {
return Err(CodecError::UnexpectedEnd);
}
let payload_bytes = buf.copy_to_bytes(payload_len);
let mut payload = &payload_bytes[..];
Self::decode_payload(msg_type, &mut payload)
}
fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
match self {
ControlMessage::ClientSetup(m) => {
VarInt::from_usize(m.supported_versions.len()).encode(buf);
for v in &m.supported_versions {
v.encode(buf);
}
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::ServerSetup(m) => {
m.selected_version.encode(buf);
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::GoAway(m) => {
if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
return Err(CodecError::GoAwayUriTooLong);
}
VarInt::from_usize(m.new_session_uri.len()).encode(buf);
buf.put_slice(&m.new_session_uri);
}
ControlMessage::MaxSubscribeId(m) => {
m.subscribe_id.encode(buf);
}
ControlMessage::Subscribe(m) => {
m.subscribe_id.encode(buf);
m.track_alias.encode(buf);
m.track_namespace.encode(buf);
VarInt::from_usize(m.track_name.len()).encode(buf);
buf.put_slice(&m.track_name);
buf.put_u8(m.subscriber_priority);
buf.put_u8(m.group_order as u8);
buf.put_u8(m.filter_type as u8);
if let Some(loc) = &m.start_location {
loc.encode(buf);
}
if let Some(eg) = &m.end_group {
eg.encode(buf);
}
if let Some(eo) = &m.end_object {
eo.encode(buf);
}
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::SubscribeOk(m) => {
m.subscribe_id.encode(buf);
m.expires.encode(buf);
buf.put_u8(m.group_order as u8);
buf.put_u8(m.content_exists as u8);
if let Some(gid) = &m.largest_group_id {
gid.encode(buf);
}
if let Some(oid) = &m.largest_object_id {
oid.encode(buf);
}
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::SubscribeError(m) => {
if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
return Err(CodecError::ReasonPhraseTooLong);
}
m.subscribe_id.encode(buf);
m.error_code.encode(buf);
VarInt::from_usize(m.reason_phrase.len()).encode(buf);
buf.put_slice(&m.reason_phrase);
m.track_alias.encode(buf);
}
ControlMessage::SubscribeUpdate(m) => {
m.subscribe_id.encode(buf);
m.start_group.encode(buf);
m.start_object.encode(buf);
m.end_group.encode(buf);
m.end_object.encode(buf);
buf.put_u8(m.subscriber_priority);
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::SubscribeDone(m) => {
if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
return Err(CodecError::ReasonPhraseTooLong);
}
m.subscribe_id.encode(buf);
m.status_code.encode(buf);
VarInt::from_usize(m.reason_phrase.len()).encode(buf);
buf.put_slice(&m.reason_phrase);
buf.put_u8(m.content_exists as u8);
if let Some(fg) = &m.final_group {
fg.encode(buf);
}
if let Some(fo) = &m.final_object {
fo.encode(buf);
}
}
ControlMessage::Unsubscribe(m) => {
m.subscribe_id.encode(buf);
}
ControlMessage::Announce(m) => {
m.track_namespace.encode(buf);
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::AnnounceOk(m) => {
m.track_namespace.encode(buf);
}
ControlMessage::AnnounceError(m) => {
if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
return Err(CodecError::ReasonPhraseTooLong);
}
m.track_namespace.encode(buf);
m.error_code.encode(buf);
VarInt::from_usize(m.reason_phrase.len()).encode(buf);
buf.put_slice(&m.reason_phrase);
}
ControlMessage::AnnounceCancel(m) => {
if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
return Err(CodecError::ReasonPhraseTooLong);
}
m.track_namespace.encode(buf);
m.error_code.encode(buf);
VarInt::from_usize(m.reason_phrase.len()).encode(buf);
buf.put_slice(&m.reason_phrase);
}
ControlMessage::Unannounce(m) => {
m.track_namespace.encode(buf);
}
ControlMessage::SubscribeAnnounces(m) => {
m.track_namespace_prefix.encode(buf);
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::SubscribeAnnouncesOk(m) => {
m.track_namespace_prefix.encode(buf);
}
ControlMessage::SubscribeAnnouncesError(m) => {
if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
return Err(CodecError::ReasonPhraseTooLong);
}
m.track_namespace_prefix.encode(buf);
m.error_code.encode(buf);
VarInt::from_usize(m.reason_phrase.len()).encode(buf);
buf.put_slice(&m.reason_phrase);
}
ControlMessage::UnsubscribeAnnounces(m) => {
m.track_namespace_prefix.encode(buf);
}
ControlMessage::TrackStatusRequest(m) => {
m.track_namespace.encode(buf);
VarInt::from_usize(m.track_name.len()).encode(buf);
buf.put_slice(&m.track_name);
}
ControlMessage::TrackStatus(m) => {
m.track_namespace.encode(buf);
VarInt::from_usize(m.track_name.len()).encode(buf);
buf.put_slice(&m.track_name);
m.status_code.encode(buf);
m.last_group_id.encode(buf);
m.last_object_id.encode(buf);
}
ControlMessage::Fetch(m) => {
m.subscribe_id.encode(buf);
m.track_namespace.encode(buf);
VarInt::from_usize(m.track_name.len()).encode(buf);
buf.put_slice(&m.track_name);
buf.put_u8(m.subscriber_priority);
buf.put_u8(m.group_order as u8);
m.start_group.encode(buf);
m.start_object.encode(buf);
m.end_group.encode(buf);
m.end_object.encode(buf);
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::FetchOk(m) => {
m.subscribe_id.encode(buf);
buf.put_u8(m.group_order as u8);
buf.put_u8(m.end_of_track);
if let Some(gid) = &m.largest_group_id {
gid.encode(buf);
}
if let Some(oid) = &m.largest_object_id {
oid.encode(buf);
}
KeyValuePair::encode_list_d07(&m.parameters, buf);
}
ControlMessage::FetchError(m) => {
if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
return Err(CodecError::ReasonPhraseTooLong);
}
m.subscribe_id.encode(buf);
m.error_code.encode(buf);
VarInt::from_usize(m.reason_phrase.len()).encode(buf);
buf.put_slice(&m.reason_phrase);
}
ControlMessage::FetchCancel(m) => {
m.subscribe_id.encode(buf);
}
}
Ok(())
}
fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
match msg_type {
MessageType::ClientSetup => {
let num_versions = VarInt::decode(buf)?.into_inner() as usize;
if num_versions == 0 {
return Err(CodecError::InvalidField);
}
let mut supported_versions = Vec::with_capacity(num_versions);
for _ in 0..num_versions {
supported_versions.push(VarInt::decode(buf)?);
}
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
}
MessageType::ServerSetup => {
let selected_version = VarInt::decode(buf)?;
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
}
MessageType::GoAway => {
let uri_len = VarInt::decode(buf)?.into_inner() as usize;
let uri = read_bytes(buf, uri_len)?;
Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
}
MessageType::MaxSubscribeId => {
let subscribe_id = VarInt::decode(buf)?;
Ok(ControlMessage::MaxSubscribeId(MaxSubscribeId { subscribe_id }))
}
MessageType::Subscribe => {
let subscribe_id = VarInt::decode(buf)?;
let track_alias = VarInt::decode(buf)?;
let track_namespace = TrackNamespace::decode(buf)?;
let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
let track_name = read_bytes(buf, track_name_len)?;
if buf.remaining() < 3 {
return Err(CodecError::UnexpectedEnd);
}
let subscriber_priority = buf.get_u8();
let group_order =
GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
let filter_val = buf.get_u8();
let filter_type =
FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
let start_location = match filter_type {
FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
Some(Location::decode(buf)?)
}
_ => None,
};
let (end_group, end_object) = match filter_type {
FilterType::AbsoluteRange => {
let eg = VarInt::decode(buf)?;
let eo = VarInt::decode(buf)?;
(Some(eg), Some(eo))
}
_ => (None, None),
};
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::Subscribe(Subscribe {
subscribe_id,
track_alias,
track_namespace,
track_name,
subscriber_priority,
group_order,
filter_type,
start_location,
end_group,
end_object,
parameters,
}))
}
MessageType::SubscribeOk => {
let subscribe_id = VarInt::decode(buf)?;
let expires = VarInt::decode(buf)?;
if buf.remaining() < 2 {
return Err(CodecError::UnexpectedEnd);
}
let group_order =
GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
let content_exists_val = buf.get_u8();
let content_exists = match content_exists_val {
0 => ContentExists::NoLargestLocation,
1 => ContentExists::HasLargestLocation,
_ => return Err(CodecError::InvalidField),
};
let (largest_group_id, largest_object_id) =
if content_exists == ContentExists::HasLargestLocation {
let gid = VarInt::decode(buf)?;
let oid = VarInt::decode(buf)?;
(Some(gid), Some(oid))
} else {
(None, None)
};
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::SubscribeOk(SubscribeOk {
subscribe_id,
expires,
group_order,
content_exists,
largest_group_id,
largest_object_id,
parameters,
}))
}
MessageType::SubscribeError => {
let subscribe_id = VarInt::decode(buf)?;
let error_code = VarInt::decode(buf)?;
let reason_len = VarInt::decode(buf)?.into_inner() as usize;
let reason_phrase = read_bytes(buf, reason_len)?;
let track_alias = VarInt::decode(buf)?;
Ok(ControlMessage::SubscribeError(SubscribeError {
subscribe_id,
error_code,
reason_phrase,
track_alias,
}))
}
MessageType::SubscribeUpdate => {
let subscribe_id = VarInt::decode(buf)?;
let start_group = VarInt::decode(buf)?;
let start_object = VarInt::decode(buf)?;
let end_group = VarInt::decode(buf)?;
let end_object = VarInt::decode(buf)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let subscriber_priority = buf.get_u8();
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
subscribe_id,
start_group,
start_object,
end_group,
end_object,
subscriber_priority,
parameters,
}))
}
MessageType::SubscribeDone => {
let subscribe_id = VarInt::decode(buf)?;
let status_code = VarInt::decode(buf)?;
let reason_len = VarInt::decode(buf)?.into_inner() as usize;
let reason_phrase = read_bytes(buf, reason_len)?;
if buf.remaining() < 1 {
return Err(CodecError::UnexpectedEnd);
}
let content_exists_val = buf.get_u8();
let content_exists = match content_exists_val {
0 => ContentExists::NoLargestLocation,
1 => ContentExists::HasLargestLocation,
_ => return Err(CodecError::InvalidField),
};
let (final_group, final_object) =
if content_exists == ContentExists::HasLargestLocation {
let fg = VarInt::decode(buf)?;
let fo = VarInt::decode(buf)?;
(Some(fg), Some(fo))
} else {
(None, None)
};
Ok(ControlMessage::SubscribeDone(SubscribeDone {
subscribe_id,
status_code,
reason_phrase,
content_exists,
final_group,
final_object,
}))
}
MessageType::Unsubscribe => {
let subscribe_id = VarInt::decode(buf)?;
Ok(ControlMessage::Unsubscribe(Unsubscribe { subscribe_id }))
}
MessageType::Announce => {
let track_namespace = TrackNamespace::decode(buf)?;
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::Announce(Announce { track_namespace, parameters }))
}
MessageType::AnnounceOk => {
let track_namespace = TrackNamespace::decode(buf)?;
Ok(ControlMessage::AnnounceOk(AnnounceOk { track_namespace }))
}
MessageType::AnnounceError => {
let track_namespace = TrackNamespace::decode(buf)?;
let error_code = VarInt::decode(buf)?;
let reason_len = VarInt::decode(buf)?.into_inner() as usize;
let reason_phrase = read_bytes(buf, reason_len)?;
Ok(ControlMessage::AnnounceError(AnnounceError {
track_namespace,
error_code,
reason_phrase,
}))
}
MessageType::AnnounceCancel => {
let track_namespace = TrackNamespace::decode(buf)?;
let error_code = VarInt::decode(buf)?;
let reason_len = VarInt::decode(buf)?.into_inner() as usize;
let reason_phrase = read_bytes(buf, reason_len)?;
Ok(ControlMessage::AnnounceCancel(AnnounceCancel {
track_namespace,
error_code,
reason_phrase,
}))
}
MessageType::Unannounce => {
let track_namespace = TrackNamespace::decode(buf)?;
Ok(ControlMessage::Unannounce(Unannounce { track_namespace }))
}
MessageType::SubscribeAnnounces => {
let track_namespace_prefix = TrackNamespace::decode(buf)?;
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::SubscribeAnnounces(SubscribeAnnounces {
track_namespace_prefix,
parameters,
}))
}
MessageType::SubscribeAnnouncesOk => {
let track_namespace_prefix = TrackNamespace::decode(buf)?;
Ok(ControlMessage::SubscribeAnnouncesOk(SubscribeAnnouncesOk {
track_namespace_prefix,
}))
}
MessageType::SubscribeAnnouncesError => {
let track_namespace_prefix = TrackNamespace::decode(buf)?;
let error_code = VarInt::decode(buf)?;
let reason_len = VarInt::decode(buf)?.into_inner() as usize;
let reason_phrase = read_bytes(buf, reason_len)?;
Ok(ControlMessage::SubscribeAnnouncesError(SubscribeAnnouncesError {
track_namespace_prefix,
error_code,
reason_phrase,
}))
}
MessageType::UnsubscribeAnnounces => {
let track_namespace_prefix = TrackNamespace::decode(buf)?;
Ok(ControlMessage::UnsubscribeAnnounces(UnsubscribeAnnounces {
track_namespace_prefix,
}))
}
MessageType::TrackStatusRequest => {
let track_namespace = TrackNamespace::decode(buf)?;
let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
let track_name = read_bytes(buf, track_name_len)?;
Ok(ControlMessage::TrackStatusRequest(TrackStatusRequest {
track_namespace,
track_name,
}))
}
MessageType::TrackStatus => {
let track_namespace = TrackNamespace::decode(buf)?;
let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
let track_name = read_bytes(buf, track_name_len)?;
let status_code = VarInt::decode(buf)?;
let last_group_id = VarInt::decode(buf)?;
let last_object_id = VarInt::decode(buf)?;
Ok(ControlMessage::TrackStatus(TrackStatus {
track_namespace,
track_name,
status_code,
last_group_id,
last_object_id,
}))
}
MessageType::Fetch => {
let subscribe_id = VarInt::decode(buf)?;
let track_namespace = TrackNamespace::decode(buf)?;
let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
let track_name = read_bytes(buf, track_name_len)?;
if buf.remaining() < 2 {
return Err(CodecError::UnexpectedEnd);
}
let subscriber_priority = buf.get_u8();
let group_order =
GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
let start_group = VarInt::decode(buf)?;
let start_object = VarInt::decode(buf)?;
let end_group = VarInt::decode(buf)?;
let end_object = VarInt::decode(buf)?;
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::Fetch(Fetch {
subscribe_id,
track_namespace,
track_name,
subscriber_priority,
group_order,
start_group,
start_object,
end_group,
end_object,
parameters,
}))
}
MessageType::FetchOk => {
let subscribe_id = VarInt::decode(buf)?;
if buf.remaining() < 2 {
return Err(CodecError::UnexpectedEnd);
}
let group_order =
GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
let end_of_track = buf.get_u8();
let largest_group_id = Some(VarInt::decode(buf)?);
let largest_object_id = Some(VarInt::decode(buf)?);
let parameters = KeyValuePair::decode_list_d07(buf)?;
Ok(ControlMessage::FetchOk(FetchOk {
subscribe_id,
group_order,
end_of_track,
largest_group_id,
largest_object_id,
parameters,
}))
}
MessageType::FetchError => {
let subscribe_id = VarInt::decode(buf)?;
let error_code = VarInt::decode(buf)?;
let reason_len = VarInt::decode(buf)?.into_inner() as usize;
let reason_phrase = read_bytes(buf, reason_len)?;
Ok(ControlMessage::FetchError(FetchError {
subscribe_id,
error_code,
reason_phrase,
}))
}
MessageType::FetchCancel => {
let subscribe_id = VarInt::decode(buf)?;
Ok(ControlMessage::FetchCancel(FetchCancel { subscribe_id }))
}
}
}
}