use std::borrow::Cow;
use crate::{
Path,
coding::{Decode, DecodeError, Encode, EncodeError},
ietf::{
FilterType, GroupOrder, Location, Parameters, RequestId,
namespace::{decode_namespace, encode_namespace},
},
};
use super::Message;
use super::Version;
#[derive(Clone, Debug)]
pub struct PublishDone<'a> {
pub request_id: Option<RequestId>,
pub status_code: u64,
pub stream_count: u64,
pub reason_phrase: Cow<'a, str>,
}
impl Message for PublishDone<'_> {
const ID: u64 = 0x0b;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
if version != Version::Draft17 {
self.request_id
.expect("request_id required for draft14-16")
.encode(w, version)?;
} else {
assert!(self.request_id.is_none(), "request_id must be None for draft17");
}
self.status_code.encode(w, version)?;
self.stream_count.encode(w, version)?;
self.reason_phrase.encode(w, version)?;
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = if version == Version::Draft17 {
None
} else {
Some(RequestId::decode(r, version)?)
};
let status_code = u64::decode(r, version)?;
let stream_count = u64::decode(r, version)?;
let reason_phrase = Cow::<str>::decode(r, version)?;
Ok(Self {
request_id,
status_code,
stream_count,
reason_phrase,
})
}
}
#[derive(Debug)]
pub struct Publish<'a> {
pub request_id: RequestId,
pub track_namespace: Path<'a>,
pub track_name: Cow<'a, str>,
pub track_alias: u64,
pub group_order: GroupOrder,
pub largest_location: Option<Location>,
pub forward: bool,
}
impl Message for Publish<'_> {
const ID: u64 = 0x1D;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
self.request_id.encode(w, version)?;
if version == Version::Draft17 {
0u64.encode(w, version)?; }
encode_namespace(w, &self.track_namespace, version)?;
self.track_name.encode(w, version)?;
self.track_alias.encode(w, version)?;
match version {
Version::Draft14 => {
self.group_order.encode(w, version)?;
if let Some(location) = &self.largest_location {
true.encode(w, version)?;
location.encode(w, version)?;
} else {
false.encode(w, version)?;
}
self.forward.encode(w, version)?;
0u8.encode(w, version)?;
}
Version::Draft15 | Version::Draft16 | Version::Draft17 => {
encode_params!(w, version,
0x09 => self.largest_location,
0x10 => self.forward,
0x22 => self.group_order,
);
}
}
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = RequestId::decode(r, version)?;
if version == Version::Draft17 {
let _required_request_id_delta = u64::decode(r, version)?;
}
let track_namespace = decode_namespace(r, version)?;
let track_name = Cow::<str>::decode(r, version)?;
let track_alias = u64::decode(r, version)?;
match version {
Version::Draft14 => {
let group_order = GroupOrder::decode(r, version)?;
let content_exists = bool::decode(r, version)?;
let largest_location = match content_exists {
true => Some(Location::decode(r, version)?),
false => None,
};
let forward = bool::decode(r, version)?;
let _params = Parameters::decode(r, version)?;
Ok(Self {
request_id,
track_namespace,
track_name,
track_alias,
group_order,
largest_location,
forward,
})
}
Version::Draft15 | Version::Draft16 | Version::Draft17 => {
decode_params!(r, version,
0x09 => largest_location: Option<Location>,
0x10 => forward: Option<bool>,
0x22 => group_order: Option<GroupOrder>,
);
super::properties::skip_properties(r, version)?;
let group_order = group_order.unwrap_or(GroupOrder::Descending);
let forward = forward.unwrap_or(true);
Ok(Self {
request_id,
track_namespace,
track_name,
track_alias,
group_order,
largest_location,
forward,
})
}
}
}
}
#[derive(Debug)]
pub struct PublishOk {
pub request_id: Option<RequestId>,
pub forward: bool,
pub subscriber_priority: u8,
pub group_order: GroupOrder,
pub filter_type: FilterType,
}
impl Message for PublishOk {
const ID: u64 = 0x1E;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
if version != Version::Draft17 {
self.request_id
.expect("request_id required for draft14-16")
.encode(w, version)?;
} else {
assert!(self.request_id.is_none(), "request_id must be None for draft17");
}
match version {
Version::Draft14 => {
self.forward.encode(w, version)?;
self.subscriber_priority.encode(w, version)?;
self.group_order.encode(w, version)?;
self.filter_type.encode(w, version)?;
debug_assert!(
matches!(self.filter_type, FilterType::LargestObject | FilterType::NextGroup),
"absolute subscribe not supported"
);
0u8.encode(w, version)?;
}
Version::Draft15 | Version::Draft16 | Version::Draft17 => {
encode_params!(w, version,
0x10 => self.forward,
0x20 => self.subscriber_priority,
0x21 => self.filter_type,
0x22 => self.group_order,
);
}
}
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = if version == Version::Draft17 {
None
} else {
Some(RequestId::decode(r, version)?)
};
match version {
Version::Draft14 => {
let forward = bool::decode(r, version)?;
let subscriber_priority = u8::decode(r, version)?;
let group_order = GroupOrder::decode(r, version)?;
let filter_type = FilterType::decode(r, version)?;
match filter_type {
FilterType::AbsoluteStart => {
let _start = Location::decode(r, version)?;
}
FilterType::AbsoluteRange => {
let _start = Location::decode(r, version)?;
let _end_group = u64::decode(r, version)?;
}
FilterType::NextGroup | FilterType::LargestObject => {}
};
let _params = Parameters::decode(r, version)?;
Ok(Self {
request_id,
forward,
subscriber_priority,
group_order,
filter_type,
})
}
Version::Draft15 | Version::Draft16 | Version::Draft17 => {
decode_params!(r, version,
0x10 => forward: Option<bool>,
0x20 => subscriber_priority: Option<u8>,
0x21 => filter_type: Option<FilterType>,
0x22 => group_order: Option<GroupOrder>,
);
let forward = forward.unwrap_or(true);
let subscriber_priority = subscriber_priority.unwrap_or(128);
let group_order = group_order.unwrap_or(GroupOrder::Descending);
let filter_type = filter_type.unwrap_or(FilterType::LargestObject);
Ok(Self {
request_id,
forward,
subscriber_priority,
group_order,
filter_type,
})
}
}
}
}
#[derive(Debug)]
pub struct PublishError<'a> {
pub request_id: RequestId,
pub error_code: u64,
pub reason_phrase: Cow<'a, str>,
}
impl Message for PublishError<'_> {
const ID: u64 = 0x1F;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
self.request_id.encode(w, version)?;
self.error_code.encode(w, version)?;
self.reason_phrase.encode(w, version)?;
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = RequestId::decode(r, version)?;
let error_code = u64::decode(r, version)?;
let reason_phrase = Cow::<str>::decode(r, version)?;
Ok(Self {
request_id,
error_code,
reason_phrase,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
fn encode_message<M: Message>(msg: &M, version: Version) -> Vec<u8> {
let mut buf = BytesMut::new();
msg.encode_msg(&mut buf, version).unwrap();
buf.to_vec()
}
fn decode_message<M: Message>(bytes: &[u8], version: Version) -> Result<M, DecodeError> {
let mut buf = bytes::Bytes::from(bytes.to_vec());
M::decode_msg(&mut buf, version)
}
#[test]
fn test_publish_v14_round_trip() {
let msg = Publish {
request_id: RequestId(1),
track_namespace: Path::new("test/ns"),
track_name: "video".into(),
track_alias: 42,
group_order: GroupOrder::Descending,
largest_location: Some(Location { group: 10, object: 5 }),
forward: true,
};
let encoded = encode_message(&msg, Version::Draft14);
let decoded: Publish = decode_message(&encoded, Version::Draft14).unwrap();
assert_eq!(decoded.request_id, RequestId(1));
assert_eq!(decoded.track_namespace.as_str(), "test/ns");
assert_eq!(decoded.track_name, "video");
assert_eq!(decoded.track_alias, 42);
assert_eq!(decoded.largest_location, Some(Location { group: 10, object: 5 }));
assert!(decoded.forward);
}
#[test]
fn test_publish_v15_round_trip() {
let msg = Publish {
request_id: RequestId(1),
track_namespace: Path::new("test/ns"),
track_name: "video".into(),
track_alias: 42,
group_order: GroupOrder::Descending,
largest_location: Some(Location { group: 10, object: 5 }),
forward: true,
};
let encoded = encode_message(&msg, Version::Draft15);
let decoded: Publish = decode_message(&encoded, Version::Draft15).unwrap();
assert_eq!(decoded.request_id, RequestId(1));
assert_eq!(decoded.track_namespace.as_str(), "test/ns");
assert_eq!(decoded.track_name, "video");
assert_eq!(decoded.track_alias, 42);
assert_eq!(decoded.largest_location, Some(Location { group: 10, object: 5 }));
assert!(decoded.forward);
}
#[test]
fn test_publish_ok_v14_round_trip() {
let msg = PublishOk {
request_id: Some(RequestId(7)),
forward: true,
subscriber_priority: 128,
group_order: GroupOrder::Descending,
filter_type: FilterType::LargestObject,
};
let encoded = encode_message(&msg, Version::Draft14);
let decoded: PublishOk = decode_message(&encoded, Version::Draft14).unwrap();
assert_eq!(decoded.request_id, Some(RequestId(7)));
assert!(decoded.forward);
assert_eq!(decoded.subscriber_priority, 128);
}
#[test]
fn test_publish_ok_v15_round_trip() {
let msg = PublishOk {
request_id: Some(RequestId(7)),
forward: true,
subscriber_priority: 128,
group_order: GroupOrder::Descending,
filter_type: FilterType::LargestObject,
};
let encoded = encode_message(&msg, Version::Draft15);
let decoded: PublishOk = decode_message(&encoded, Version::Draft15).unwrap();
assert_eq!(decoded.request_id, Some(RequestId(7)));
assert!(decoded.forward);
assert_eq!(decoded.subscriber_priority, 128);
}
#[test]
fn test_publish_v17_round_trip() {
let msg = Publish {
request_id: RequestId(1),
track_namespace: Path::new("test/ns"),
track_name: "video".into(),
track_alias: 42,
group_order: GroupOrder::Descending,
largest_location: Some(Location { group: 10, object: 5 }),
forward: true,
};
let encoded = encode_message(&msg, Version::Draft17);
let decoded: Publish = decode_message(&encoded, Version::Draft17).unwrap();
assert_eq!(decoded.request_id, RequestId(1));
assert_eq!(decoded.track_namespace.as_str(), "test/ns");
assert_eq!(decoded.track_name, "video");
assert_eq!(decoded.track_alias, 42);
assert_eq!(decoded.largest_location, Some(Location { group: 10, object: 5 }));
assert!(decoded.forward);
}
#[test]
fn test_publish_ok_v17_round_trip() {
let msg = PublishOk {
request_id: None,
forward: true,
subscriber_priority: 128,
group_order: GroupOrder::Descending,
filter_type: FilterType::LargestObject,
};
let encoded = encode_message(&msg, Version::Draft17);
let decoded: PublishOk = decode_message(&encoded, Version::Draft17).unwrap();
assert_eq!(decoded.request_id, None);
assert!(decoded.forward);
assert_eq!(decoded.subscriber_priority, 128);
}
#[test]
fn test_publish_done_v17_round_trip() {
let msg = PublishDone {
request_id: None,
status_code: 200,
stream_count: 5,
reason_phrase: "OK".into(),
};
let encoded = encode_message(&msg, Version::Draft17);
let decoded: PublishDone = decode_message(&encoded, Version::Draft17).unwrap();
assert_eq!(decoded.request_id, None);
assert_eq!(decoded.status_code, 200);
assert_eq!(decoded.stream_count, 5);
assert_eq!(decoded.reason_phrase, "OK");
}
}