ntex_mqtt/v5/codec/packet/
publish.rs

1use std::{fmt, num::NonZeroU16, num::NonZeroU32};
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use crate::error::{DecodeError, EncodeError};
6use crate::types::{packet_type, QoS};
7use crate::utils::{self, write_variable_length, Decode, Encode, Property};
8use crate::v5::codec::{encode::*, property_type as pt, UserProperties};
9
10/// PUBLISH message
11#[derive(Debug, PartialEq, Eq, Clone)]
12pub struct Publish {
13    /// this might be re-delivery of an earlier attempt to send the Packet.
14    pub dup: bool,
15    pub retain: bool,
16    /// the level of assurance for delivery of an Application Message.
17    pub qos: QoS,
18    /// only present in PUBLISH Packets where the QoS level is 1 or 2.
19    pub packet_id: Option<NonZeroU16>,
20    pub topic: ByteString,
21    pub payload_size: u32,
22    pub properties: PublishProperties,
23}
24
25#[derive(Debug, PartialEq, Eq, Clone, Default)]
26pub struct PublishProperties {
27    pub topic_alias: Option<NonZeroU16>,
28    pub correlation_data: Option<Bytes>,
29    pub message_expiry_interval: Option<NonZeroU32>,
30    pub content_type: Option<ByteString>,
31    pub user_properties: UserProperties,
32    pub is_utf8_payload: bool,
33    pub response_topic: Option<ByteString>,
34    pub subscription_ids: Vec<NonZeroU32>,
35}
36
37impl Publish {
38    pub(crate) fn decode(
39        src: &mut Bytes,
40        packet_flags: u8,
41        payload_size: u32,
42    ) -> Result<Self, DecodeError> {
43        let topic = ByteString::decode(src)?;
44        let qos = QoS::try_from((packet_flags & 0b0110) >> 1)?;
45        let packet_id = if qos == QoS::AtMostOnce {
46            None
47        } else {
48            Some(NonZeroU16::decode(src)?) // packet id = 0 encountered
49        };
50        let properties = parse_publish_properties(src)?;
51
52        Ok(Self {
53            qos,
54            topic,
55            packet_id,
56            properties,
57            payload_size,
58            dup: (packet_flags & 0b1000) == 0b1000,
59            retain: (packet_flags & 0b0001) == 0b0001,
60        })
61    }
62
63    pub(crate) fn packet_header_size(
64        src: &BytesMut,
65        packet_flags: u8,
66    ) -> Result<Option<u32>, DecodeError> {
67        if src.remaining() < 2 {
68            return Ok(None);
69        }
70
71        // topic len
72        let mut len = u16::from_be_bytes([src[0], src[1]]) as u32 + 2;
73
74        // packet-id len
75        let qos = QoS::try_from((packet_flags & 0b0110) >> 1)?;
76        if qos != QoS::AtMostOnce {
77            len += 2; // len of u16
78        }
79        if src.remaining() < len as usize {
80            return Ok(None);
81        }
82
83        // properties len
84        if let Some((prop_len, pos)) = utils::decode_variable_length(&src[len as usize..])? {
85            Ok(Some(len + prop_len + pos as u32))
86        } else {
87            Ok(None)
88        }
89    }
90}
91
92fn parse_publish_properties(src: &mut Bytes) -> Result<PublishProperties, DecodeError> {
93    let prop_src = &mut utils::take_properties(src)?;
94
95    let mut message_expiry_interval = None;
96    let mut topic_alias = None;
97    let mut content_type = None;
98    let mut correlation_data = None;
99    let mut subscription_ids = Vec::new();
100    let mut response_topic = None;
101    let mut is_utf8_payload = None;
102    let mut user_props = Vec::new();
103
104    while prop_src.has_remaining() {
105        match prop_src.get_u8() {
106            pt::UTF8_PAYLOAD => is_utf8_payload.read_value(prop_src)?,
107            pt::MSG_EXPIRY_INT => message_expiry_interval.read_value(prop_src)?,
108            pt::CONTENT_TYPE => content_type.read_value(prop_src)?,
109            pt::RESP_TOPIC => response_topic.read_value(prop_src)?,
110            pt::CORR_DATA => correlation_data.read_value(prop_src)?,
111            pt::SUB_ID => {
112                let id = utils::decode_variable_length_cursor(prop_src)?;
113                subscription_ids.push(NonZeroU32::new(id).ok_or(DecodeError::MalformedPacket)?);
114            }
115            pt::TOPIC_ALIAS => topic_alias.read_value(prop_src)?,
116            pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
117            _ => return Err(DecodeError::MalformedPacket),
118        }
119    }
120
121    Ok(PublishProperties {
122        message_expiry_interval,
123        topic_alias,
124        content_type,
125        correlation_data,
126        subscription_ids,
127        response_topic,
128        is_utf8_payload: is_utf8_payload.unwrap_or(false),
129        user_properties: user_props,
130    })
131}
132
133impl EncodeLtd for Publish {
134    fn encoded_size(&self, _limit: u32) -> usize {
135        let packet_id_size = if self.qos == QoS::AtMostOnce { 0 } else { 2 };
136        self.topic.encoded_size()
137            + packet_id_size
138            + self.properties.encoded_size(_limit)
139            + self.payload_size as usize
140    }
141
142    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
143        // publish fixed headers
144        buf.put_u8(
145            packet_type::PUBLISH_START
146                | (u8::from(self.qos) << 1)
147                | ((self.dup as u8) << 3)
148                | (self.retain as u8),
149        );
150        utils::write_variable_length(size, buf);
151
152        // publish headers
153        let start_len = buf.len();
154
155        self.topic.encode(buf)?;
156        if self.qos == QoS::AtMostOnce {
157            if self.packet_id.is_some() {
158                return Err(EncodeError::MalformedPacket); // packet id must not be set
159            }
160        } else {
161            self.packet_id.ok_or(EncodeError::PacketIdRequired)?.encode(buf)?;
162        }
163        self.properties
164            .encode(buf, size - (buf.len() - start_len + self.payload_size as usize) as u32)?;
165
166        Ok(())
167    }
168}
169
170impl EncodeLtd for PublishProperties {
171    fn encoded_size(&self, _limit: u32) -> usize {
172        let prop_len = encoded_property_size(&self.topic_alias)
173            + encoded_property_size(&self.correlation_data)
174            + encoded_property_size(&self.message_expiry_interval)
175            + encoded_property_size(&self.content_type)
176            + encoded_property_size_default(&self.is_utf8_payload, false)
177            + encoded_property_size(&self.response_topic)
178            + self
179                .subscription_ids
180                .iter()
181                .fold(0, |acc, id| acc + 1 + var_int_len(id.get() as usize) as usize)
182            + self.user_properties.encoded_size();
183        prop_len + var_int_len(prop_len) as usize
184    }
185
186    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
187        let prop_len = var_int_len_from_size(size);
188        utils::write_variable_length(prop_len, buf);
189        encode_property(&self.topic_alias, pt::TOPIC_ALIAS, buf)?;
190        encode_property(&self.correlation_data, pt::CORR_DATA, buf)?;
191        encode_property(&self.message_expiry_interval, pt::MSG_EXPIRY_INT, buf)?;
192        encode_property(&self.content_type, pt::CONTENT_TYPE, buf)?;
193        encode_property_default(&self.is_utf8_payload, false, pt::UTF8_PAYLOAD, buf)?;
194        encode_property(&self.response_topic, pt::RESP_TOPIC, buf)?;
195        for sub_id in self.subscription_ids.iter() {
196            buf.put_u8(pt::SUB_ID);
197            write_variable_length(sub_id.get(), buf);
198        }
199        self.user_properties.encode(buf)
200    }
201}