rmqtt_codec/v5/packet/
publish.rs

1use std::{num::NonZeroU16, num::NonZeroU32};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use crate::error::{DecodeError, EncodeError};
8use crate::types::QoS;
9use crate::utils::{self, write_variable_length, Decode, Encode, Property};
10use crate::v5::{encode::*, property_type as pt, UserProperties};
11
12pub(crate) type Publish = crate::types::Publish;
13
14#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
15pub struct PublishProperties {
16    pub topic_alias: Option<NonZeroU16>,
17    pub correlation_data: Option<Bytes>,
18    pub message_expiry_interval: Option<NonZeroU32>,
19    pub content_type: Option<ByteString>,
20    pub user_properties: UserProperties,
21    pub is_utf8_payload: bool,
22    pub response_topic: Option<ByteString>,
23    pub subscription_ids: Vec<NonZeroU32>,
24}
25
26impl Publish {
27    pub(crate) fn decode(mut src: Bytes, packet_flags: u8) -> Result<Self, DecodeError> {
28        let topic = ByteString::decode(&mut src)?;
29        let qos = QoS::try_from((packet_flags & 0b0110) >> 1)?;
30        let packet_id = if qos == QoS::AtMostOnce {
31            None
32        } else {
33            Some(NonZeroU16::decode(&mut src)?) // packet id = 0 encountered
34        };
35
36        let properties = parse_publish_properties(&mut src)?;
37        let payload = src;
38
39        Ok(Self {
40            dup: (packet_flags & 0b1000) == 0b1000,
41            qos,
42            retain: (packet_flags & 0b0001) == 0b0001,
43            topic,
44            packet_id,
45            payload,
46            properties: Some(properties),
47        })
48    }
49}
50
51impl std::convert::From<UserProperties> for PublishProperties {
52    fn from(props: UserProperties) -> Self {
53        PublishProperties { user_properties: props, ..Default::default() }
54    }
55}
56
57fn parse_publish_properties(src: &mut Bytes) -> Result<PublishProperties, DecodeError> {
58    let prop_src = &mut utils::take_properties(src)?;
59
60    let mut message_expiry_interval = None;
61    let mut topic_alias = None;
62    let mut content_type = None;
63    let mut correlation_data = None;
64    let mut subscription_ids = Vec::new();
65    let mut response_topic = None;
66    let mut is_utf8_payload = None;
67    let mut user_props = Vec::new();
68
69    while prop_src.has_remaining() {
70        match prop_src.get_u8() {
71            pt::UTF8_PAYLOAD => is_utf8_payload.read_value(prop_src)?,
72            pt::MSG_EXPIRY_INT => message_expiry_interval.read_value(prop_src)?,
73            pt::CONTENT_TYPE => content_type.read_value(prop_src)?,
74            pt::RESP_TOPIC => response_topic.read_value(prop_src)?,
75            pt::CORR_DATA => correlation_data.read_value(prop_src)?,
76            pt::SUB_ID => {
77                let id = utils::decode_variable_length_cursor(prop_src)?;
78                subscription_ids.push(NonZeroU32::new(id).ok_or(DecodeError::MalformedPacket)?);
79            }
80            pt::TOPIC_ALIAS => topic_alias.read_value(prop_src)?,
81            pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
82            _ => return Err(DecodeError::MalformedPacket),
83        }
84    }
85
86    Ok(PublishProperties {
87        message_expiry_interval,
88        topic_alias,
89        content_type,
90        correlation_data,
91        subscription_ids,
92        response_topic,
93        is_utf8_payload: is_utf8_payload.unwrap_or(false),
94        user_properties: user_props,
95    })
96}
97
98impl EncodeLtd for Publish {
99    fn encoded_size(&self, _limit: u32) -> usize {
100        let packet_id_size = if self.qos == QoS::AtMostOnce { 0 } else { 2 };
101        self.topic.encoded_size()
102            + packet_id_size
103            + self
104                .properties
105                .as_ref()
106                .map(|p| p.encoded_size(_limit))
107                .unwrap_or_else(|| PublishProperties::default().encoded_size(_limit))
108            + self.payload.len()
109    }
110
111    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
112        let start_len = buf.len();
113        self.topic.encode(buf)?;
114        if self.qos == QoS::AtMostOnce {
115            if self.packet_id.is_some() {
116                return Err(EncodeError::MalformedPacket); // packet id must not be set
117            }
118        } else {
119            self.packet_id.ok_or(EncodeError::PacketIdRequired)?.encode(buf)?;
120        }
121        if let Some(prop) = &self.properties {
122            prop.encode(buf, size - (buf.len() - start_len + self.payload.len()) as u32)?;
123        } else {
124            PublishProperties::default()
125                .encode(buf, size - (buf.len() - start_len + self.payload.len()) as u32)?;
126        }
127        buf.put(self.payload.as_ref());
128        Ok(())
129    }
130}
131
132impl EncodeLtd for PublishProperties {
133    fn encoded_size(&self, _limit: u32) -> usize {
134        let prop_len = encoded_property_size(&self.topic_alias)
135            + encoded_property_size(&self.correlation_data)
136            + encoded_property_size(&self.message_expiry_interval)
137            + encoded_property_size(&self.content_type)
138            + encoded_property_size_default(&self.is_utf8_payload, false)
139            + encoded_property_size(&self.response_topic)
140            + self
141                .subscription_ids
142                .iter()
143                .fold(0, |acc, id| acc + 1 + var_int_len(id.get() as usize) as usize)
144            + self.user_properties.encoded_size();
145        prop_len + var_int_len(prop_len) as usize
146    }
147
148    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
149        let prop_len = var_int_len_from_size(size);
150        utils::write_variable_length(prop_len, buf);
151        encode_property(&self.topic_alias, pt::TOPIC_ALIAS, buf)?;
152        encode_property(&self.correlation_data, pt::CORR_DATA, buf)?;
153        encode_property(&self.message_expiry_interval, pt::MSG_EXPIRY_INT, buf)?;
154        encode_property(&self.content_type, pt::CONTENT_TYPE, buf)?;
155        encode_property_default(&self.is_utf8_payload, false, pt::UTF8_PAYLOAD, buf)?;
156        encode_property(&self.response_topic, pt::RESP_TOPIC, buf)?;
157        for sub_id in self.subscription_ids.iter() {
158            buf.put_u8(pt::SUB_ID);
159            write_variable_length(sub_id.get(), buf);
160        }
161        self.user_properties.encode(buf)
162    }
163}