use super::*;
use crate::prelude::DEFAULT_MIMETYPE;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SetupFrame {
flags: Flags,
version: Version,
keepalive: u32,
lifetime: u32,
resume_token: Option<Bytes>,
metadata_mimetype: Bytes,
data_mimetype: Bytes,
payload: Payload,
}
impl SetupFrame {
pub const STREAM_ID: u32 = 0;
pub const TYPE: FrameType = FrameType::SETUP;
pub fn builder() -> SetupFrameBuilder {
SetupFrameBuilder::default()
}
pub fn is_lease(&self) -> bool {
self.flags.contains(Flags::LEASE)
}
pub fn is_resume(&self) -> bool {
self.flags.contains(Flags::RESUME)
}
pub fn version(&self) -> Version {
self.version
}
pub fn keepalive(&self) -> Duration {
Duration::from_millis(self.keepalive as u64)
}
pub fn lifetime(&self) -> Duration {
Duration::from_millis(self.lifetime as u64)
}
pub fn resume_token(&self) -> Option<&Bytes> {
self.resume_token.as_ref()
}
pub fn metadata_mimetype(&self) -> Option<&str> {
match std::str::from_utf8(&self.metadata_mimetype).ok() {
Some(x) => {
if x.is_ascii() {
Some(x)
} else {
None
}
}
None => None,
}
}
pub fn data_mimetype(&self) -> Option<&str> {
match std::str::from_utf8(&self.data_mimetype).ok() {
Some(x) => {
if x.is_ascii() {
Some(x)
} else {
None
}
}
None => None,
}
}
pub fn metadata(&self) -> Option<&Bytes> {
self.payload.metadata()
}
pub fn data(&self) -> Option<&Bytes> {
self.payload.data()
}
pub fn payload(self) -> Payload {
self.payload
}
}
impl Encode for SetupFrame {
fn encode(&self, buf: &mut BytesMut) {
buf.put_u32(SetupFrame::STREAM_ID);
buf.put_u16(FrameType::SETUP.bits() | self.flags.bits());
self.version.encode(buf);
buf.put_u32(self.keepalive);
buf.put_u32(self.lifetime);
if let Some(token) = &self.resume_token {
buf.put_u16(token.len() as u16);
buf.put_slice(token);
}
buf.put_u8(self.metadata_mimetype.len() as u8);
buf.put_slice(&self.metadata_mimetype);
buf.put_u8(self.data_mimetype.len() as u8);
buf.put_slice(&self.data_mimetype);
let u24 = U24::from_usize(
self.payload.metadata().map(|v| v.len()).unwrap_or_default(),
);
buf.put_u8(u24.0);
buf.put_u16(u24.1);
self.payload.encode(buf);
}
fn len(&self) -> usize {
let mut len = 20;
if let Some(resume_token) = &self.resume_token {
len += resume_token.len();
}
len += 1 + self.metadata_mimetype.len() + 1 + self.data_mimetype.len();
len += 3 + self.payload.len();
len
}
}
impl Decode for SetupFrame {
type Value = Self;
fn decode<B: Buf>(
buf: &mut B,
_stream_id: u32,
flags: Flags,
) -> Result<Self::Value> {
let version = eat_version(buf)?;
let keepalive = eat_u31(buf)?;
let lifetime = eat_u31(buf)?;
let resume_token = eat_resume_token(buf, flags)?;
let metadata_mimetype_len = eat_u8(buf)?;
let metadata_mimetype =
eat_bytes(buf, metadata_mimetype_len as usize)?;
let data_mimetype_len = eat_u8(buf)?;
let data_mimetype = eat_bytes(buf, data_mimetype_len as usize)?;
let payload = eat_payload(buf, true)?;
Ok(SetupFrame {
flags,
version,
keepalive,
lifetime,
resume_token,
metadata_mimetype,
data_mimetype,
payload,
})
}
}
#[derive(Debug)]
pub struct SetupFrameBuilder {
flags: Flags,
version: Version,
keepalive: u32,
lifetime: u32,
resume_token: Option<Bytes>,
metadata_mimetype: Bytes,
data_mimetype: Bytes,
payload: Payload,
}
impl Default for SetupFrameBuilder {
fn default() -> SetupFrameBuilder {
SetupFrameBuilder {
flags: Flags::empty(),
version: Version::default(),
keepalive: 45_000,
lifetime: 90_000,
resume_token: None,
metadata_mimetype: Bytes::from(DEFAULT_MIMETYPE),
data_mimetype: Bytes::from(DEFAULT_MIMETYPE),
payload: Payload::default(),
}
}
}
impl SetupFrameBuilder {
pub fn set_resume_flag(mut self) -> Self {
self.flags |= Flags::RESUME;
self
}
pub fn set_lease_flag(mut self) -> Self {
self.flags |= Flags::LEASE;
self
}
pub fn set_version(mut self, major: u16, minor: u16) -> Self {
self.version = Version::new(major, minor);
self
}
pub fn set_keepalive(mut self, keepalive: u32) -> Self {
debug_assert_max_u31!(keepalive);
self.keepalive = keepalive & MAX_U31;
self
}
pub fn set_lifetime(mut self, lifetime: u32) -> Self {
debug_assert_max_u31!(lifetime);
self.lifetime = lifetime & MAX_U31;
self
}
pub fn set_resume_token(mut self, token: Bytes) -> Self {
assert!(token.len() <= 65_535);
self.resume_token = Some(token);
self.flags |= Flags::RESUME;
self
}
pub fn set_metadata_mimetype<T>(mut self, mimetype: T) -> Self
where
T: Into<String>,
{
let mimetype: String = mimetype.into();
assert!(mimetype.len() <= 256);
self.metadata_mimetype = Bytes::from(mimetype);
self
}
pub fn set_data_mimetype<T>(mut self, mimetype: T) -> Self
where
T: Into<String>,
{
let mimetype: String = mimetype.into();
assert!(mimetype.len() <= 256);
self.data_mimetype = Bytes::from(mimetype);
self
}
pub fn set_metadata(mut self, metadata: Bytes) -> Self {
self.flags |= Flags::METADATA;
self.payload.metadata = Some(metadata);
self
}
pub fn set_data(mut self, data: Bytes) -> Self {
self.payload.data = Some(data);
self
}
pub fn build(self) -> SetupFrame {
SetupFrame {
flags: self.flags,
version: self.version,
keepalive: self.keepalive,
lifetime: self.lifetime,
resume_token: self.resume_token,
metadata_mimetype: self.metadata_mimetype,
data_mimetype: self.data_mimetype,
payload: self.payload,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_id() {
assert_eq!(SetupFrame::STREAM_ID, 0);
}
#[test]
fn test_codec() {
let setup = SetupFrame::builder()
.set_version(1, 0)
.set_keepalive(1000)
.set_lifetime(2000)
.set_resume_flag()
.set_lease_flag()
.set_resume_token(Bytes::from("resume token".to_string()))
.set_metadata_mimetype("application/json")
.set_data_mimetype("application/binary")
.set_metadata(Bytes::from("metadata"))
.set_data(Bytes::from("data"))
.build();
let mut buf = BytesMut::new();
setup.encode(&mut buf);
let mut buf = buf.freeze();
let buf_len = buf.len();
assert_eq!(
buf_len,
4 + 2 + 4 + 4 + 4 + 2 + 12 + 1 + 16 + 1 + 18 + 3 + 8 + 4
);
let stream_id = eat_stream_id(&mut buf).unwrap();
let (frame_type, flags) = eat_flags(&mut buf).unwrap();
assert_eq!(stream_id, 0);
assert_eq!(frame_type, FrameType::SETUP);
assert_eq!(flags, Flags::METADATA | Flags::RESUME | Flags::LEASE);
let decoded = SetupFrame::decode(&mut buf, stream_id, flags).unwrap();
assert_eq!(decoded, setup);
assert_eq!(setup.len(), buf_len);
assert_eq!(decoded.len(), buf_len);
}
}