use std::borrow::Cow;
use crate::{
Path,
coding::{Decode, DecodeError, Encode, EncodeError, Sizer},
};
use super::{Message, Version};
#[derive(Clone, Debug)]
pub struct Subscribe<'a> {
pub id: u64,
pub broadcast: Path<'a>,
pub track: Cow<'a, str>,
pub priority: u8,
pub ordered: bool,
pub max_latency: std::time::Duration,
pub start_group: Option<u64>,
pub end_group: Option<u64>,
}
impl Message for Subscribe<'_> {
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let id = u64::decode(r, version)?;
let broadcast = Path::decode(r, version)?;
let track = Cow::<str>::decode(r, version)?;
let priority = u8::decode(r, version)?;
let (ordered, max_latency, start_group, end_group) = match version {
Version::Lite01 | Version::Lite02 => (false, std::time::Duration::ZERO, None, None),
_ => {
let ordered = u8::decode(r, version)? != 0;
let max_latency = std::time::Duration::decode(r, version)?;
let start_group = Option::<u64>::decode(r, version)?;
let end_group = Option::<u64>::decode(r, version)?;
(ordered, max_latency, start_group, end_group)
}
};
Ok(Self {
id,
broadcast,
track,
priority,
ordered,
max_latency,
start_group,
end_group,
})
}
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
self.id.encode(w, version)?;
self.broadcast.encode(w, version)?;
self.track.encode(w, version)?;
self.priority.encode(w, version)?;
match version {
Version::Lite01 | Version::Lite02 => {}
_ => {
(self.ordered as u8).encode(w, version)?;
self.max_latency.encode(w, version)?;
self.start_group.encode(w, version)?;
self.end_group.encode(w, version)?;
}
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct SubscribeOk {
pub priority: u8,
pub ordered: bool,
pub max_latency: std::time::Duration,
pub start_group: Option<u64>,
pub end_group: Option<u64>,
}
impl Message for SubscribeOk {
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
match version {
Version::Lite01 => {
self.priority.encode(w, version)?;
}
Version::Lite02 => {}
_ => {
self.priority.encode(w, version)?;
(self.ordered as u8).encode(w, version)?;
self.max_latency.encode(w, version)?;
self.start_group.encode(w, version)?;
self.end_group.encode(w, version)?;
}
}
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
match version {
Version::Lite01 => Ok(Self {
priority: u8::decode(r, version)?,
ordered: false,
max_latency: std::time::Duration::ZERO,
start_group: None,
end_group: None,
}),
Version::Lite02 => Ok(Self {
priority: 0,
ordered: false,
max_latency: std::time::Duration::ZERO,
start_group: None,
end_group: None,
}),
_ => {
let priority = u8::decode(r, version)?;
let ordered = u8::decode(r, version)? != 0;
let max_latency = std::time::Duration::decode(r, version)?;
let start_group = Option::<u64>::decode(r, version)?;
let end_group = Option::<u64>::decode(r, version)?;
Ok(Self {
priority,
ordered,
max_latency,
start_group,
end_group,
})
}
}
}
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct SubscribeUpdate {
pub priority: u8,
pub ordered: bool,
pub max_latency: std::time::Duration,
pub start_group: Option<u64>,
pub end_group: Option<u64>,
}
impl Message for SubscribeUpdate {
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
match version {
Version::Lite01 | Version::Lite02 => {
return Err(DecodeError::Version);
}
_ => {}
}
let priority = u8::decode(r, version)?;
let ordered = u8::decode(r, version)? != 0;
let max_latency = std::time::Duration::decode(r, version)?;
let start_group = match u64::decode(r, version)? {
0 => None,
group => Some(group - 1),
};
let end_group = match u64::decode(r, version)? {
0 => None,
group => Some(group - 1),
};
Ok(Self {
priority,
ordered,
max_latency,
start_group,
end_group,
})
}
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
match version {
Version::Lite01 | Version::Lite02 => {
return Err(EncodeError::Version);
}
_ => {}
}
self.priority.encode(w, version)?;
(self.ordered as u8).encode(w, version)?;
self.max_latency.encode(w, version)?;
match self.start_group {
Some(start_group) => start_group
.checked_add(1)
.ok_or(EncodeError::TooLarge)?
.encode(w, version)?,
None => 0u64.encode(w, version)?,
}
match self.end_group {
Some(end_group) => end_group
.checked_add(1)
.ok_or(EncodeError::TooLarge)?
.encode(w, version)?,
None => 0u64.encode(w, version)?,
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct SubscribeDrop {
pub start: u64,
pub end: u64,
pub error: u64,
}
impl Message for SubscribeDrop {
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
match version {
Version::Lite01 | Version::Lite02 => {
return Err(DecodeError::Version);
}
_ => {}
}
Ok(Self {
start: u64::decode(r, version)?,
end: u64::decode(r, version)?,
error: u64::decode(r, version)?,
})
}
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
match version {
Version::Lite01 | Version::Lite02 => {
return Err(EncodeError::Version);
}
_ => {}
}
self.start.encode(w, version)?;
self.end.encode(w, version)?;
self.error.encode(w, version)?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub enum SubscribeResponse {
Ok(SubscribeOk),
Drop(SubscribeDrop),
}
impl Encode<Version> for SubscribeResponse {
fn encode<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
match version {
Version::Lite01 | Version::Lite02 => match self {
Self::Ok(ok) => {
let mut sizer = Sizer::default();
Message::encode_msg(ok, &mut sizer, version)?;
sizer.size.encode(w, version)?;
Message::encode_msg(ok, w, version)?;
}
Self::Drop(_) => {
return Err(EncodeError::Version);
}
},
_ => match self {
Self::Ok(ok) => {
0u64.encode(w, version)?;
let mut sizer = Sizer::default();
Message::encode_msg(ok, &mut sizer, version)?;
sizer.size.encode(w, version)?;
Message::encode_msg(ok, w, version)?;
}
Self::Drop(drop) => {
1u64.encode(w, version)?;
let mut sizer = Sizer::default();
Message::encode_msg(drop, &mut sizer, version)?;
sizer.size.encode(w, version)?;
Message::encode_msg(drop, w, version)?;
}
},
}
Ok(())
}
}
impl Decode<Version> for SubscribeResponse {
fn decode<B: bytes::Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
match version {
Version::Lite01 | Version::Lite02 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)),
_ => {
let typ = u64::decode(buf, version)?;
match typ {
0 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)),
1 => Ok(Self::Drop(SubscribeDrop::decode(buf, version)?)),
_ => Err(DecodeError::InvalidMessage(typ)),
}
}
}
}
}