use std::borrow::Cow;
use crate::{Path, coding::*, ietf::RequestId};
use super::Message;
use super::namespace::{decode_namespace, encode_namespace};
use super::Version;
#[derive(Clone, Debug)]
pub struct PublishNamespace<'a> {
pub request_id: RequestId,
pub track_namespace: Path<'a>,
}
impl Message for PublishNamespace<'_> {
const ID: u64 = 0x06;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
self.request_id.encode(w, version)?;
if version == Version::Draft17 {
0u64.encode(w, version)?; }
encode_namespace(w, &self.track_namespace, version)?;
encode_params!(w, version,);
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = RequestId::decode(r, version)?;
if version == Version::Draft17 {
let _required_request_id_delta = u64::decode(r, version)?;
}
let track_namespace = decode_namespace(r, version)?;
decode_params!(r, version,);
Ok(Self {
request_id,
track_namespace,
})
}
}
#[derive(Clone, Debug)]
pub struct PublishNamespaceOk {
pub request_id: RequestId,
}
impl Message for PublishNamespaceOk {
const ID: u64 = 0x07;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
self.request_id.encode(w, version)?;
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = RequestId::decode(r, version)?;
Ok(Self { request_id })
}
}
#[derive(Clone, Debug)]
pub struct PublishNamespaceError<'a> {
pub request_id: RequestId,
pub error_code: u64,
pub reason_phrase: Cow<'a, str>,
}
impl Message for PublishNamespaceError<'_> {
const ID: u64 = 0x08;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
self.request_id.encode(w, version)?;
self.error_code.encode(w, version)?;
self.reason_phrase.encode(w, version)?;
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let request_id = RequestId::decode(r, version)?;
let error_code = u64::decode(r, version)?;
let reason_phrase = Cow::<str>::decode(r, version)?;
Ok(Self {
request_id,
error_code,
reason_phrase,
})
}
}
#[derive(Clone, Debug)]
pub struct PublishNamespaceDone<'a> {
pub track_namespace: Path<'a>,
pub request_id: RequestId,
}
impl Message for PublishNamespaceDone<'_> {
const ID: u64 = 0x09;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
match version {
Version::Draft14 | Version::Draft15 => {
encode_namespace(w, &self.track_namespace, version)?;
}
Version::Draft16 => {
self.request_id.encode(w, version)?;
}
Version::Draft17 => return Err(EncodeError::Version),
}
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
match version {
Version::Draft14 | Version::Draft15 => {
let track_namespace = decode_namespace(r, version)?;
Ok(Self {
track_namespace,
request_id: RequestId(0),
})
}
Version::Draft16 => {
let request_id = RequestId::decode(r, version)?;
Ok(Self {
track_namespace: Path::default(),
request_id,
})
}
Version::Draft17 => Err(DecodeError::Version),
}
}
}
#[derive(Clone, Debug)]
pub struct PublishNamespaceCancel<'a> {
pub track_namespace: Path<'a>,
pub request_id: RequestId,
pub error_code: u64,
pub reason_phrase: Cow<'a, str>,
}
impl Message for PublishNamespaceCancel<'_> {
const ID: u64 = 0x0c;
fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) -> Result<(), EncodeError> {
match version {
Version::Draft14 | Version::Draft15 => {
encode_namespace(w, &self.track_namespace, version)?;
}
Version::Draft16 => {
self.request_id.encode(w, version)?;
}
Version::Draft17 => {
return Err(EncodeError::Version);
}
}
self.error_code.encode(w, version)?;
self.reason_phrase.encode(w, version)?;
Ok(())
}
fn decode_msg<R: bytes::Buf>(r: &mut R, version: Version) -> Result<Self, DecodeError> {
let (track_namespace, request_id) = match version {
Version::Draft14 | Version::Draft15 => {
let track_namespace = decode_namespace(r, version)?;
(track_namespace, RequestId(0))
}
Version::Draft16 => {
let request_id = RequestId::decode(r, version)?;
(Path::default(), request_id)
}
Version::Draft17 => {
return Err(DecodeError::Version);
}
};
let error_code = u64::decode(r, version)?;
let reason_phrase = Cow::<str>::decode(r, version)?;
Ok(Self {
track_namespace,
request_id,
error_code,
reason_phrase,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
fn encode_message<M: Message>(msg: &M, version: Version) -> Vec<u8> {
let mut buf = BytesMut::new();
msg.encode_msg(&mut buf, version).unwrap();
buf.to_vec()
}
fn decode_message<M: Message>(bytes: &[u8], version: Version) -> Result<M, DecodeError> {
let mut buf = bytes::Bytes::from(bytes.to_vec());
M::decode_msg(&mut buf, version)
}
#[test]
fn test_announce_round_trip() {
let msg = PublishNamespace {
request_id: RequestId(1),
track_namespace: Path::new("test/broadcast"),
};
let encoded = encode_message(&msg, Version::Draft14);
let decoded: PublishNamespace = decode_message(&encoded, Version::Draft14).unwrap();
assert_eq!(decoded.track_namespace.as_str(), "test/broadcast");
}
#[test]
fn test_announce_error() {
let msg = PublishNamespaceError {
request_id: RequestId(1),
error_code: 404,
reason_phrase: "Unauthorized".into(),
};
let encoded = encode_message(&msg, Version::Draft14);
let decoded: PublishNamespaceError = decode_message(&encoded, Version::Draft14).unwrap();
assert_eq!(decoded.error_code, 404);
assert_eq!(decoded.reason_phrase, "Unauthorized");
}
#[test]
fn test_unannounce_v14() {
let msg = PublishNamespaceDone {
track_namespace: Path::new("old/stream"),
request_id: RequestId(0),
};
let encoded = encode_message(&msg, Version::Draft14);
let decoded: PublishNamespaceDone = decode_message(&encoded, Version::Draft14).unwrap();
assert_eq!(decoded.track_namespace.as_str(), "old/stream");
}
#[test]
fn test_unannounce_v16() {
let msg = PublishNamespaceDone {
track_namespace: Path::default(),
request_id: RequestId(42),
};
let encoded = encode_message(&msg, Version::Draft16);
let decoded: PublishNamespaceDone = decode_message(&encoded, Version::Draft16).unwrap();
assert_eq!(decoded.request_id, RequestId(42));
}
#[test]
fn test_announce_cancel_v14() {
let msg = PublishNamespaceCancel {
track_namespace: Path::new("canceled"),
request_id: RequestId(0),
error_code: 1,
reason_phrase: "Shutdown".into(),
};
let encoded = encode_message(&msg, Version::Draft14);
let decoded: PublishNamespaceCancel = decode_message(&encoded, Version::Draft14).unwrap();
assert_eq!(decoded.track_namespace.as_str(), "canceled");
assert_eq!(decoded.error_code, 1);
assert_eq!(decoded.reason_phrase, "Shutdown");
}
#[test]
fn test_announce_cancel_v16() {
let msg = PublishNamespaceCancel {
track_namespace: Path::default(),
request_id: RequestId(7),
error_code: 1,
reason_phrase: "Shutdown".into(),
};
let encoded = encode_message(&msg, Version::Draft16);
let decoded: PublishNamespaceCancel = decode_message(&encoded, Version::Draft16).unwrap();
assert_eq!(decoded.request_id, RequestId(7));
assert_eq!(decoded.error_code, 1);
assert_eq!(decoded.reason_phrase, "Shutdown");
}
#[test]
fn test_publish_namespace_v17_round_trip() {
let msg = PublishNamespace {
request_id: RequestId(5),
track_namespace: Path::new("v17/broadcast"),
};
let encoded = encode_message(&msg, Version::Draft17);
let decoded: PublishNamespace = decode_message(&encoded, Version::Draft17).unwrap();
assert_eq!(decoded.request_id, RequestId(5));
assert_eq!(decoded.track_namespace.as_str(), "v17/broadcast");
}
#[test]
fn test_publish_namespace_done_v17_rejected() {
let msg = PublishNamespaceDone {
track_namespace: Path::default(),
request_id: RequestId(42),
};
let mut buf = BytesMut::new();
assert!(msg.encode_msg(&mut buf, Version::Draft17).is_err());
}
#[test]
fn test_publish_namespace_cancel_v17_rejected() {
let msg = PublishNamespaceCancel {
track_namespace: Path::default(),
request_id: RequestId(7),
error_code: 1,
reason_phrase: "Shutdown".into(),
};
let mut buf = BytesMut::new();
assert!(msg.encode_msg(&mut buf, Version::Draft17).is_err());
}
#[test]
fn test_announce_rejects_parameters() {
#[rustfmt::skip]
let invalid_bytes = vec![
0x01, 0x04, 0x74, 0x65, 0x73, 0x74, 0x01, ];
let result: Result<PublishNamespace, _> = decode_message(&invalid_bytes, Version::Draft14);
assert!(result.is_err());
}
}