mqtt_tiny/packets/
publish.rs

1//! MQTT [`PUBLISH`](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037)
2
3use crate::anyvec::AnyVec;
4use crate::coding::encoder::{BytesIter, OptionalU16Iter, PacketLenIter, U8Iter, Unit};
5use crate::coding::length::Length;
6use crate::coding::{Decoder, Encoder};
7use crate::err;
8use crate::error::{Data, DecoderError, MemoryError};
9use crate::packets::TryFromIterator;
10use core::iter::Chain;
11
12/// An MQTT [`PUBLISH` packet](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037)
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct Publish<Bytes> {
15    /// Whether this packet is a redelivery or not
16    dup: bool,
17    /// The packet QoS
18    ///
19    /// # QoS Levels
20    /// Valid QoS levels are:
21    ///  - `0`: At most one delivery
22    ///  - `1`: At least one delivery
23    ///  - `2`: Exactly one delivery
24    qos: u8,
25    /// Whether the message should be retained
26    retain: bool,
27    /// The message topic
28    topic: Bytes,
29    /// The packet ID
30    packet_id: Option<u16>,
31    /// The payload
32    payload: Bytes,
33}
34impl<Bytes> Publish<Bytes>
35where
36    Bytes: AnyVec<u8>,
37{
38    /// The packet type constant
39    pub const TYPE: u8 = 3;
40
41    /// Creates a new packet
42    pub fn new<T, P>(topic: T, payload: P, retain: bool) -> Result<Self, MemoryError>
43    where
44        T: AsRef<[u8]>,
45        P: AsRef<[u8]>,
46    {
47        let topic = Bytes::new(topic.as_ref())?;
48        let payload = Bytes::new(payload.as_ref())?;
49        Ok(Self { dup: false, qos: 0, retain, topic, packet_id: None, payload })
50    }
51    /// Configures the packet quality-of-service level and specifies whether this packet is a duplicate transmission
52    /// (aka retry) or not
53    ///
54    /// # QoS Levels
55    /// Valid QoS levels are:
56    ///  - `0`: At most one delivery
57    ///  - `1`: At least one delivery
58    ///  - `2`: Exactly one delivery
59    pub fn with_qos(mut self, qos: u8, packet_id: u16, dup: bool) -> Self {
60        self.dup = dup;
61        self.qos = qos;
62        self.packet_id = Some(packet_id);
63        self
64    }
65
66    /// The message topic
67    pub fn topic(&self) -> &[u8] {
68        self.topic.as_ref()
69    }
70
71    /// The payload
72    pub fn payload(&self) -> &[u8] {
73        self.payload.as_ref()
74    }
75
76    /// Whether the message should be retained
77    pub fn retain(&self) -> bool {
78        self.retain
79    }
80
81    /// Whether this packet is a redelivery or not
82    pub fn dup(&self) -> bool {
83        self.dup
84    }
85    /// The packet QoS
86    pub fn qos(&self) -> u8 {
87        self.qos
88    }
89    /// The packet ID
90    pub fn packet_id(&self) -> Option<u16> {
91        self.packet_id
92    }
93}
94impl<Bytes> TryFromIterator for Publish<Bytes>
95where
96    Bytes: AnyVec<u8>,
97{
98    fn try_from_iter<T>(iter: T) -> Result<Self, DecoderError>
99    where
100        T: IntoIterator<Item = u8>,
101    {
102        // Read packet:
103        //  - header type and flags
104        //  - packet len
105        //  - topic
106        //  - packet ID
107        let mut decoder = Decoder::new(iter);
108        let (Self::TYPE, [dup, qos0, qos1, retain]) = decoder.header()? else {
109            return Err(err!(Data::SpecViolation, "invalid packet type"))?;
110        };
111        // Limit length
112        let len = decoder.packetlen()?;
113        let mut decoder = decoder.limit(len);
114        // Read fields
115        let topic = decoder.bytes()?;
116        let packet_id = decoder.optional_u16(qos0 || qos1)?;
117        let payload = decoder.raw_remainder()?;
118
119        // Init self
120        let qos = ((qos0 as u8) << 1) | (qos1 as u8);
121        Ok(Self { dup, qos, retain, topic, packet_id, payload })
122    }
123}
124impl<Bytes> IntoIterator for Publish<Bytes>
125where
126    Bytes: AnyVec<u8>,
127{
128    type Item = u8;
129    #[rustfmt::skip]
130    type IntoIter =
131        // Complex iterator built out of the individual message fields
132        Chain<Chain<Chain<Chain<Chain<
133            // - header type and flags
134            Unit, U8Iter>,
135            // - packet len
136            PacketLenIter>,
137            // - topic
138            BytesIter<Bytes>>,
139            // - packet ID
140            OptionalU16Iter>,
141            //  - payload
142            <Bytes as IntoIterator>::IntoIter>;
143
144    fn into_iter(self) -> Self::IntoIter {
145        // Assemble flags
146        #[rustfmt::skip]
147        let flags = [
148            self.dup,
149            (self.qos >> 1) != 0,
150            (self.qos & 1) != 0,
151            self.retain
152        ];
153
154        // Precompute body length:
155        //  - header type and flags
156        //  - packet len
157        //  - payload
158        #[rustfmt::skip]
159        let len = Length::new()
160            .bytes(&self.topic)
161            .optional_u16(&self.packet_id)
162            .raw(&self.payload)
163            .into();
164
165        // Write packet:
166        //  - header type and flags
167        //  - packet len
168        //  - topic
169        //  - packet ID
170        //  - payload
171        Encoder::default()
172            .header(Self::TYPE, flags)
173            .packetlen(len)
174            .bytes(self.topic)
175            .optional_u16(self.packet_id)
176            .raw(self.payload)
177            .into_iter()
178    }
179}