rmqtt_codec/
types.rs

1use std::fmt;
2use std::num::NonZeroU16;
3
4use bytes::Bytes;
5use bytestring::ByteString;
6use serde::{Deserialize, Serialize};
7
8use crate::v5::PublishProperties;
9
10/// MQTT protocol name for version 3.1.1
11pub(crate) const MQTT: &[u8] = b"MQTT";
12/// Legacy MQTT protocol name for version 3.1
13pub(crate) const MQISDP: &[u8] = b"MQIsdp";
14/// Protocol level for MQTT 3.1
15pub const MQTT_LEVEL_31: u8 = 3;
16/// Protocol level for MQTT 3.1.1
17pub const MQTT_LEVEL_311: u8 = 4;
18/// Protocol level for MQTT 5.0
19pub const MQTT_LEVEL_5: u8 = 5;
20/// Bit shift position for Will QoS in Connect flags
21pub(crate) const WILL_QOS_SHIFT: u8 = 3;
22
23/// Maximum allowed packet size (268,435,455 bytes)
24pub(crate) const MAX_PACKET_SIZE: u32 = 0xF_FF_FF_FF;
25
26/// Represents MQTT protocol version information
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
28pub struct Protocol(pub u8);
29
30impl Protocol {
31    /// Gets the protocol name string
32    #[inline]
33    pub fn name(self) -> &'static str {
34        match self {
35            Protocol(MQTT_LEVEL_311) => "MQTT",
36            Protocol(MQTT_LEVEL_31) => "MQIsdp",
37            Protocol(MQTT_LEVEL_5) => "MQTT",
38            Protocol(_) => "MQTT",
39        }
40    }
41
42    /// Gets the protocol level number
43    #[inline]
44    pub fn level(self) -> u8 {
45        self.0
46    }
47}
48
49impl Default for Protocol {
50    /// Default protocol version (3.1.1)
51    fn default() -> Self {
52        Protocol(MQTT_LEVEL_311)
53    }
54}
55
56prim_enum! {
57    /// Quality of Service levels for message delivery
58    ///
59    /// Determines the guarantee of message delivery between client and broker
60    #[derive(serde::Serialize, serde::Deserialize, PartialOrd, Ord, Hash)]
61    pub enum QoS {
62        /// At most once delivery (Fire and Forget)
63        ///
64        /// # Example
65        /// Used for non-critical sensor data where occasional loss is acceptable
66        AtMostOnce = 0,
67
68        /// At least once delivery (Acknowledged Delivery)
69        ///
70        /// # Example
71        /// Used for important notifications that must be received but can tolerate duplicates
72        AtLeastOnce = 1,
73
74        /// Exactly once delivery (Assured Delivery)
75        ///
76        /// # Example
77        /// Used for critical commands where duplication could cause harmful effects
78        ExactlyOnce = 2
79    }
80}
81
82impl QoS {
83    /// Gets the numeric value of the QoS level
84    #[inline]
85    pub fn value(&self) -> u8 {
86        match self {
87            QoS::AtMostOnce => 0,
88            QoS::AtLeastOnce => 1,
89            QoS::ExactlyOnce => 2,
90        }
91    }
92
93    /// Returns the lower of two QoS levels
94    ///
95    /// # Example
96    /// ```
97    /// use rmqtt_codec::types::QoS;
98    ///
99    /// let lower = QoS::ExactlyOnce.less_value(QoS::AtLeastOnce);
100    /// assert_eq!(lower, QoS::AtLeastOnce);
101    /// ```
102    #[inline]
103    pub fn less_value(&self, qos: QoS) -> QoS {
104        if self.value() < qos.value() {
105            *self
106        } else {
107            qos
108        }
109    }
110}
111
112impl From<QoS> for u8 {
113    fn from(v: QoS) -> Self {
114        v.value()
115    }
116}
117
118bitflags::bitflags! {
119    /// Connection flags for MQTT CONNECT packet
120    #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
121    pub struct ConnectFlags: u8 {
122        /// Username flag (bit 7)
123        const USERNAME    = 0b1000_0000;
124        /// Password flag (bit 6)
125        const PASSWORD    = 0b0100_0000;
126        /// Will retain flag (bit 5)
127        const WILL_RETAIN = 0b0010_0000;
128        /// Will QoS mask (bits 4-3)
129        const WILL_QOS    = 0b0001_1000;
130        /// Will flag (bit 2)
131        const WILL        = 0b0000_0100;
132        /// Clean session flag (bit 1)
133        const CLEAN_START = 0b0000_0010;
134    }
135}
136
137bitflags::bitflags! {
138    /// Connection acknowledgment flags for MQTT CONNACK packet
139    #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
140    pub struct ConnectAckFlags: u8 {
141        /// Session present flag (bit 0)
142        const SESSION_PRESENT = 0b0000_0001;
143    }
144}
145
146/// Packet type identifiers and masks
147pub(super) mod packet_type {
148    /// CONNECT packet type (0x10)
149    pub(crate) const CONNECT: u8 = 0b0001_0000;
150    /// CONNACK packet type (0x20)
151    pub(crate) const CONNACK: u8 = 0b0010_0000;
152    /// PUBLISH packet type range start (0x30)
153    pub(crate) const PUBLISH_START: u8 = 0b0011_0000;
154    /// PUBLISH packet type range end (0x3F)
155    pub(crate) const PUBLISH_END: u8 = 0b0011_1111;
156    /// PUBACK packet type (0x40)
157    pub(crate) const PUBACK: u8 = 0b0100_0000;
158    /// PUBREC packet type (0x50)
159    pub(crate) const PUBREC: u8 = 0b0101_0000;
160    /// PUBREL packet type (0x62)
161    pub(crate) const PUBREL: u8 = 0b0110_0010;
162    /// PUBCOMP packet type (0x70)
163    pub(crate) const PUBCOMP: u8 = 0b0111_0000;
164    /// SUBSCRIBE packet type (0x82)
165    pub(crate) const SUBSCRIBE: u8 = 0b1000_0010;
166    /// SUBACK packet type (0x90)
167    pub(crate) const SUBACK: u8 = 0b1001_0000;
168    /// UNSUBSCRIBE packet type (0xA2)
169    pub(crate) const UNSUBSCRIBE: u8 = 0b1010_0010;
170    /// UNSUBACK packet type (0xB0)
171    pub(crate) const UNSUBACK: u8 = 0b1011_0000;
172    /// PINGREQ packet type (0xC0)
173    pub(crate) const PINGREQ: u8 = 0b1100_0000;
174    /// PINGRESP packet type (0xD0)
175    pub(crate) const PINGRESP: u8 = 0b1101_0000;
176    /// DISCONNECT packet type (0xE0)
177    pub(crate) const DISCONNECT: u8 = 0b1110_0000;
178    /// AUTH packet type (0xF0) - MQTT 5.0 only
179    pub(crate) const AUTH: u8 = 0b1111_0000;
180}
181
182/// Represents the fixed header of an MQTT packet
183#[derive(Debug, PartialEq, Eq, Clone, Copy)]
184pub(crate) struct FixedHeader {
185    /// First byte containing packet type and flags
186    pub(crate) first_byte: u8,
187    /// Remaining length of the packet (variable header + payload)
188    pub(crate) remaining_length: u32,
189}
190
191/// MQTT PUBLISH packet structure
192#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
193pub struct Publish {
194    /// Duplicate delivery flag
195    pub dup: bool,
196    /// Retain message flag
197    pub retain: bool,
198    /// Quality of Service level
199    pub qos: QoS,
200    /// Topic name to publish to
201    pub topic: ByteString,
202    /// Packet identifier (required for QoS 1 and 2)
203    pub packet_id: Option<NonZeroU16>,
204    /// Message payload
205    pub payload: Bytes,
206
207    /// MQTT 5.0 properties (None for MQTT 3.1.1)
208    pub properties: Option<PublishProperties>,
209    /// Delayed publish interval in seconds
210    pub delay_interval: Option<u32>,
211    /// Message creation timestamp
212    pub create_time: Option<i64>,
213}
214
215impl fmt::Debug for Publish {
216    /// Security-conscious debug implementation that redacts payload
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        f.debug_struct("Publish")
219            .field("packet_id", &self.packet_id)
220            .field("topic", &self.topic)
221            .field("dup", &self.dup)
222            .field("retain", &self.retain)
223            .field("qos", &self.qos)
224            .field("payload", &"<REDACTED>")
225            .field("properties", &self.properties)
226            .field("delay_interval", &self.delay_interval)
227            .field("create_time", &self.create_time)
228            .finish()
229    }
230}