rmqtt_codec/v5/packet/
mod.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use bytestring::ByteString;
3
4pub use crate::types::{ConnectAckFlags, ConnectFlags, QoS};
5
6use super::{encode::*, property_type as pt, UserProperties};
7use crate::error::{DecodeError, EncodeError};
8use crate::types::packet_type;
9use crate::utils::{take_properties, write_variable_length, Decode, Property};
10
11mod auth;
12mod connack;
13mod connect;
14mod disconnect;
15mod pubacks;
16mod publish;
17mod subscribe;
18
19pub use auth::*;
20pub use connack::*;
21pub use connect::*;
22pub use disconnect::*;
23pub use pubacks::*;
24pub use publish::*;
25pub use subscribe::*;
26
27#[derive(Debug, PartialEq, Eq, Clone)]
28/// MQTT Control Packets
29pub enum Packet {
30    /// Client request to connect to Server
31    Connect(Box<Connect>),
32    /// Connect acknowledgment
33    ConnectAck(Box<ConnectAck>),
34    /// Publish message
35    Publish(Box<Publish>),
36    /// Publish acknowledgment
37    PublishAck(PublishAck),
38    /// Publish received (assured delivery part 1)
39    PublishReceived(PublishAck),
40    /// Publish release (assured delivery part 2)
41    PublishRelease(PublishAck2),
42    /// Publish complete (assured delivery part 3)
43    PublishComplete(PublishAck2),
44    /// Client subscribe request
45    Subscribe(Subscribe),
46    /// Subscribe acknowledgment
47    SubscribeAck(SubscribeAck),
48    /// Unsubscribe request
49    Unsubscribe(Unsubscribe),
50    /// Unsubscribe acknowledgment
51    UnsubscribeAck(UnsubscribeAck),
52    /// PING request
53    PingRequest,
54    /// PING response
55    PingResponse,
56    /// Disconnection is advertised
57    Disconnect(Disconnect),
58    /// Auth exchange
59    Auth(Auth),
60}
61
62impl Packet {
63    pub fn packet_type(&self) -> u8 {
64        match self {
65            Packet::Connect(_) => packet_type::CONNECT,
66            Packet::ConnectAck(_) => packet_type::CONNACK,
67            Packet::Publish(_) => packet_type::PUBLISH_START,
68            Packet::PublishAck(_) => packet_type::PUBACK,
69            Packet::PublishReceived(_) => packet_type::PUBREC,
70            Packet::PublishRelease(_) => packet_type::PUBREL,
71            Packet::PublishComplete(_) => packet_type::PUBCOMP,
72            Packet::Subscribe(_) => packet_type::SUBSCRIBE,
73            Packet::SubscribeAck(_) => packet_type::SUBACK,
74            Packet::Unsubscribe(_) => packet_type::UNSUBSCRIBE,
75            Packet::UnsubscribeAck(_) => packet_type::UNSUBACK,
76            Packet::PingRequest => packet_type::PINGREQ,
77            Packet::PingResponse => packet_type::PINGRESP,
78            Packet::Disconnect(_) => packet_type::DISCONNECT,
79            Packet::Auth(_) => packet_type::AUTH,
80        }
81    }
82}
83
84impl From<Connect> for Packet {
85    fn from(pkt: Connect) -> Self {
86        Self::Connect(Box::new(pkt))
87    }
88}
89
90impl From<Box<Connect>> for Packet {
91    fn from(pkt: Box<Connect>) -> Self {
92        Self::Connect(pkt)
93    }
94}
95
96impl From<ConnectAck> for Packet {
97    fn from(pkt: ConnectAck) -> Self {
98        Self::ConnectAck(Box::new(pkt))
99    }
100}
101
102impl From<Box<ConnectAck>> for Packet {
103    fn from(pkt: Box<ConnectAck>) -> Self {
104        Self::ConnectAck(pkt)
105    }
106}
107
108impl From<Publish> for Packet {
109    fn from(pkt: Publish) -> Self {
110        Self::Publish(Box::new(pkt))
111    }
112}
113
114impl From<PublishAck> for Packet {
115    fn from(pkt: PublishAck) -> Self {
116        Self::PublishAck(pkt)
117    }
118}
119
120impl From<Subscribe> for Packet {
121    fn from(pkt: Subscribe) -> Self {
122        Self::Subscribe(pkt)
123    }
124}
125
126impl From<SubscribeAck> for Packet {
127    fn from(pkt: SubscribeAck) -> Self {
128        Self::SubscribeAck(pkt)
129    }
130}
131
132impl From<Unsubscribe> for Packet {
133    fn from(pkt: Unsubscribe) -> Self {
134        Self::Unsubscribe(pkt)
135    }
136}
137
138impl From<UnsubscribeAck> for Packet {
139    fn from(pkt: UnsubscribeAck) -> Self {
140        Self::UnsubscribeAck(pkt)
141    }
142}
143
144impl From<Disconnect> for Packet {
145    fn from(pkt: Disconnect) -> Self {
146        Self::Disconnect(pkt)
147    }
148}
149
150impl From<Auth> for Packet {
151    fn from(pkt: Auth) -> Self {
152        Self::Auth(pkt)
153    }
154}
155
156pub(super) mod property_type {
157    pub(crate) const UTF8_PAYLOAD: u8 = 0x01;
158    pub(crate) const MSG_EXPIRY_INT: u8 = 0x02;
159    pub(crate) const CONTENT_TYPE: u8 = 0x03;
160    pub(crate) const RESP_TOPIC: u8 = 0x08;
161    pub(crate) const CORR_DATA: u8 = 0x09;
162    pub(crate) const SUB_ID: u8 = 0x0B;
163    pub(crate) const SESS_EXPIRY_INT: u8 = 0x11;
164    pub(crate) const ASSND_CLIENT_ID: u8 = 0x12;
165    pub(crate) const SERVER_KA: u8 = 0x13;
166    pub(crate) const AUTH_METHOD: u8 = 0x15;
167    pub(crate) const AUTH_DATA: u8 = 0x16;
168    pub(crate) const REQ_PROB_INFO: u8 = 0x17;
169    pub(crate) const WILL_DELAY_INT: u8 = 0x18;
170    pub(crate) const REQ_RESP_INFO: u8 = 0x19;
171    pub(crate) const RESP_INFO: u8 = 0x1A;
172    pub(crate) const SERVER_REF: u8 = 0x1C;
173    pub(crate) const REASON_STRING: u8 = 0x1F;
174    pub(crate) const RECEIVE_MAX: u8 = 0x21;
175    pub(crate) const TOPIC_ALIAS_MAX: u8 = 0x22;
176    pub(crate) const TOPIC_ALIAS: u8 = 0x23;
177    pub(crate) const MAX_QOS: u8 = 0x24;
178    pub(crate) const RETAIN_AVAIL: u8 = 0x25;
179    pub(crate) const USER: u8 = 0x26;
180    pub(crate) const MAX_PACKET_SIZE: u8 = 0x27;
181    pub(crate) const WILDCARD_SUB_AVAIL: u8 = 0x28;
182    pub(crate) const SUB_IDS_AVAIL: u8 = 0x29;
183    pub(crate) const SHARED_SUB_AVAIL: u8 = 0x2A;
184}
185
186mod ack_props {
187    use super::*;
188    use crate::v5::UserProperty;
189
190    pub(crate) fn encoded_size(
191        properties: &[UserProperty],
192        reason_string: &Option<ByteString>,
193        limit: u32,
194    ) -> usize {
195        if limit < 4 {
196            // todo: not really needed in practice
197            return 1; // 1 byte to encode property length = 0
198        }
199
200        let len = encoded_size_opt_props(properties, reason_string, limit - 4);
201        var_int_len(len) as usize + len
202    }
203
204    pub(crate) fn encode(
205        properties: &[UserProperty],
206        reason_string: &Option<ByteString>,
207        buf: &mut BytesMut,
208        size: u32,
209    ) -> Result<(), EncodeError> {
210        debug_assert!(size > 0); // formalize in signature?
211
212        if size == 1 {
213            // empty properties
214            buf.put_u8(0);
215            return Ok(());
216        }
217
218        let size = var_int_len_from_size(size);
219        write_variable_length(size, buf);
220        encode_opt_props(properties, reason_string, buf, size)
221    }
222
223    /// Parses ACK properties (User and Reason String properties) from `src`
224    pub(crate) fn decode(src: &mut Bytes) -> Result<(UserProperties, Option<ByteString>), DecodeError> {
225        let prop_src = &mut take_properties(src)?;
226        let mut reason_string = None;
227        let mut user_props = Vec::new();
228        while prop_src.has_remaining() {
229            let prop_id = prop_src.get_u8();
230            match prop_id {
231                pt::REASON_STRING => reason_string.read_value(prop_src)?,
232                pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
233                _ => return Err(DecodeError::MalformedPacket),
234            }
235        }
236
237        Ok((user_props, reason_string))
238    }
239}