ntex_mqtt/v5/codec/packet/
mod.rs

1use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
2
3pub use crate::types::{ConnectAckFlags, ConnectFlags, QoS};
4
5use super::{UserProperties, encode::*, property_type as pt};
6use crate::error::{DecodeError, EncodeError};
7use crate::types::packet_type;
8use crate::utils::{Decode, Property, take_properties, write_variable_length};
9
10mod auth;
11mod connack;
12mod connect;
13mod disconnect;
14mod pubacks;
15mod publish;
16mod subscribe;
17
18pub use auth::*;
19pub use connack::*;
20pub use connect::*;
21pub use disconnect::*;
22pub use pubacks::*;
23pub use publish::*;
24pub use subscribe::*;
25
26#[derive(Debug, PartialEq, Eq, Clone)]
27/// MQTT Control Packets
28pub enum Packet {
29    /// Client request to connect to Server
30    Connect(Box<Connect>),
31    /// Connect acknowledgment
32    ConnectAck(Box<ConnectAck>),
33    /// Publish acknowledgment
34    PublishAck(PublishAck),
35    /// Publish received (assured delivery part 1)
36    PublishReceived(PublishAck),
37    /// Publish release (assured delivery part 2)
38    PublishRelease(PublishAck2),
39    /// Publish complete (assured delivery part 3)
40    PublishComplete(PublishAck2),
41    /// Client subscribe request
42    Subscribe(Subscribe),
43    /// Subscribe acknowledgment
44    SubscribeAck(SubscribeAck),
45    /// Unsubscribe request
46    Unsubscribe(Unsubscribe),
47    /// Unsubscribe acknowledgment
48    UnsubscribeAck(UnsubscribeAck),
49    /// PING request
50    PingRequest,
51    /// PING response
52    PingResponse,
53    /// Disconnection is advertised
54    Disconnect(Disconnect),
55    /// Auth exchange
56    Auth(Auth),
57}
58
59impl Packet {
60    pub fn packet_type(&self) -> u8 {
61        match self {
62            Packet::Connect(_) => packet_type::CONNECT,
63            Packet::ConnectAck(_) => packet_type::CONNACK,
64            Packet::PublishAck(_) => packet_type::PUBACK,
65            Packet::PublishReceived(_) => packet_type::PUBREC,
66            Packet::PublishRelease(_) => packet_type::PUBREL,
67            Packet::PublishComplete(_) => packet_type::PUBCOMP,
68            Packet::Subscribe(_) => packet_type::SUBSCRIBE,
69            Packet::SubscribeAck(_) => packet_type::SUBACK,
70            Packet::Unsubscribe(_) => packet_type::UNSUBSCRIBE,
71            Packet::UnsubscribeAck(_) => packet_type::UNSUBACK,
72            Packet::PingRequest => packet_type::PINGREQ,
73            Packet::PingResponse => packet_type::PINGRESP,
74            Packet::Disconnect(_) => packet_type::DISCONNECT,
75            Packet::Auth(_) => packet_type::AUTH,
76        }
77    }
78}
79
80impl From<Connect> for Packet {
81    fn from(pkt: Connect) -> Self {
82        Self::Connect(Box::new(pkt))
83    }
84}
85
86impl From<Box<Connect>> for Packet {
87    fn from(pkt: Box<Connect>) -> Self {
88        Self::Connect(pkt)
89    }
90}
91
92impl From<ConnectAck> for Packet {
93    fn from(pkt: ConnectAck) -> Self {
94        Self::ConnectAck(Box::new(pkt))
95    }
96}
97
98impl From<Box<ConnectAck>> for Packet {
99    fn from(pkt: Box<ConnectAck>) -> Self {
100        Self::ConnectAck(pkt)
101    }
102}
103
104impl From<PublishAck> for Packet {
105    fn from(pkt: PublishAck) -> Self {
106        Self::PublishAck(pkt)
107    }
108}
109
110impl From<Subscribe> for Packet {
111    fn from(pkt: Subscribe) -> Self {
112        Self::Subscribe(pkt)
113    }
114}
115
116impl From<SubscribeAck> for Packet {
117    fn from(pkt: SubscribeAck) -> Self {
118        Self::SubscribeAck(pkt)
119    }
120}
121
122impl From<Unsubscribe> for Packet {
123    fn from(pkt: Unsubscribe) -> Self {
124        Self::Unsubscribe(pkt)
125    }
126}
127
128impl From<UnsubscribeAck> for Packet {
129    fn from(pkt: UnsubscribeAck) -> Self {
130        Self::UnsubscribeAck(pkt)
131    }
132}
133
134impl From<Disconnect> for Packet {
135    fn from(pkt: Disconnect) -> Self {
136        Self::Disconnect(pkt)
137    }
138}
139
140impl From<Auth> for Packet {
141    fn from(pkt: Auth) -> Self {
142        Self::Auth(pkt)
143    }
144}
145
146pub(super) mod property_type {
147    pub(crate) const UTF8_PAYLOAD: u8 = 0x01;
148    pub(crate) const MSG_EXPIRY_INT: u8 = 0x02;
149    pub(crate) const CONTENT_TYPE: u8 = 0x03;
150    pub(crate) const RESP_TOPIC: u8 = 0x08;
151    pub(crate) const CORR_DATA: u8 = 0x09;
152    pub(crate) const SUB_ID: u8 = 0x0B;
153    pub(crate) const SESS_EXPIRY_INT: u8 = 0x11;
154    pub(crate) const ASSND_CLIENT_ID: u8 = 0x12;
155    pub(crate) const SERVER_KA: u8 = 0x13;
156    pub(crate) const AUTH_METHOD: u8 = 0x15;
157    pub(crate) const AUTH_DATA: u8 = 0x16;
158    pub(crate) const REQ_PROB_INFO: u8 = 0x17;
159    pub(crate) const WILL_DELAY_INT: u8 = 0x18;
160    pub(crate) const REQ_RESP_INFO: u8 = 0x19;
161    pub(crate) const RESP_INFO: u8 = 0x1A;
162    pub(crate) const SERVER_REF: u8 = 0x1C;
163    pub(crate) const REASON_STRING: u8 = 0x1F;
164    pub(crate) const RECEIVE_MAX: u8 = 0x21;
165    pub(crate) const TOPIC_ALIAS_MAX: u8 = 0x22;
166    pub(crate) const TOPIC_ALIAS: u8 = 0x23;
167    pub(crate) const MAX_QOS: u8 = 0x24;
168    pub(crate) const RETAIN_AVAIL: u8 = 0x25;
169    pub(crate) const USER: u8 = 0x26;
170    pub(crate) const MAX_PACKET_SIZE: u8 = 0x27;
171    pub(crate) const WILDCARD_SUB_AVAIL: u8 = 0x28;
172    pub(crate) const SUB_IDS_AVAIL: u8 = 0x29;
173    pub(crate) const SHARED_SUB_AVAIL: u8 = 0x2A;
174}
175
176mod ack_props {
177    use super::*;
178    use crate::v5::codec::UserProperty;
179
180    pub(crate) fn encoded_size(
181        properties: &[UserProperty],
182        reason_string: &Option<ByteString>,
183        limit: u32,
184    ) -> usize {
185        if limit < 4 {
186            // todo: not really needed in practice
187            return 1; // 1 byte to encode property length = 0
188        }
189
190        let len = encoded_size_opt_props(properties, reason_string, limit - 4);
191        var_int_len(len) as usize + len
192    }
193
194    pub(crate) fn encode(
195        properties: &[UserProperty],
196        reason_string: &Option<ByteString>,
197        buf: &mut BytesMut,
198        size: u32,
199    ) -> Result<(), EncodeError> {
200        debug_assert!(size > 0); // formalize in signature?
201
202        if size == 1 {
203            // empty properties
204            buf.put_u8(0);
205            return Ok(());
206        }
207
208        let size = var_int_len_from_size(size);
209        write_variable_length(size, buf);
210        encode_opt_props(properties, reason_string, buf, size)
211    }
212
213    /// Parses ACK properties (User and Reason String properties) from `src`
214    pub(crate) fn decode(
215        src: &mut Bytes,
216    ) -> Result<(UserProperties, Option<ByteString>), DecodeError> {
217        let prop_src = &mut take_properties(src)?;
218        let mut reason_string = None;
219        let mut user_props = Vec::new();
220        while prop_src.has_remaining() {
221            let prop_id = prop_src.get_u8();
222            match prop_id {
223                pt::REASON_STRING => reason_string.read_value(prop_src)?,
224                pt::USER => user_props.push(<(ByteString, ByteString)>::decode(prop_src)?),
225                _ => return Err(DecodeError::MalformedPacket),
226            }
227        }
228
229        Ok((user_props, reason_string))
230    }
231}