mqtt_tiny/packets/
subscribe.rs

1//! MQTT [`SUBSCRIBE`](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063)
2
3use crate::anyvec::AnyVec;
4use crate::coding::encoder::{PacketLenIter, TopicsQosIter, U16Iter, U8Iter, Unit};
5use crate::coding::length::Length;
6use crate::coding::{Decoder, Encoder};
7use crate::err;
8use crate::error::{Data, DecoderError, MemoryError};
9use crate::packets::TryFromIterator;
10use core::iter::Chain;
11use core::marker::PhantomData;
12
13/// An MQTT [`SUBSCRIBE` packet](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063)
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct Subscribe<Seq, Bytes> {
16    /// The packet ID
17    packet_id: u16,
18    /// A list of `(topic, qos)`-tuples
19    ///
20    /// # QoS Levels
21    /// Valid QoS levels are:
22    ///  - `0`: At most one delivery
23    ///  - `1`: At least one delivery
24    ///  - `2`: Exactly one delivery
25    topics_qos: Seq,
26    /// The byte vector type
27    _vec: PhantomData<Bytes>,
28}
29impl<Seq, Bytes> Subscribe<Seq, Bytes>
30where
31    Seq: AnyVec<(Bytes, u8)>,
32    Bytes: AnyVec<u8>,
33{
34    /// The packet type constant
35    pub const TYPE: u8 = 8;
36
37    /// Creates a new packet
38    ///
39    /// # QoS Levels
40    /// Valid QoS levels are:
41    ///  - `0`: At most one delivery
42    ///  - `1`: At least one delivery
43    ///  - `2`: Exactly one delivery
44    pub fn new<S, T>(packet_id: u16, topics: S) -> Result<Self, MemoryError>
45    where
46        S: IntoIterator<Item = (T, u8)>,
47        T: AsRef<[u8]>,
48    {
49        // Collect all topic-qos pairs
50        let mut topics_qos = Seq::default();
51        for (topic, qos) in topics {
52            // Copy topic and append pair
53            let topic = Bytes::new(topic.as_ref())?;
54            topics_qos.push((topic, qos))?;
55        }
56
57        // Init self
58        Ok(Self { packet_id, topics_qos, _vec: PhantomData })
59    }
60
61    /// The packet ID
62    pub fn packet_id(&self) -> u16 {
63        self.packet_id
64    }
65
66    /// A list of `(topic, qos)`-tuples
67    pub fn topics_qos(&self) -> &Seq {
68        &self.topics_qos
69    }
70}
71impl<Seq, Bytes> TryFromIterator for Subscribe<Seq, Bytes>
72where
73    Seq: AnyVec<(Bytes, u8)>,
74    Bytes: AnyVec<u8>,
75{
76    fn try_from_iter<T>(iter: T) -> Result<Self, DecoderError>
77    where
78        T: IntoIterator<Item = u8>,
79    {
80        // Read packet:
81        //  - header type and `2` flags
82        //  - packet len
83        //  - packet ID
84        //  - sequence
85        //     - topic filter
86        //     - qos
87        let mut decoder = Decoder::new(iter);
88        let (Self::TYPE, [false, false, true, false]) = decoder.header()? else {
89            return Err(err!(Data::SpecViolation, "invalid packet type/header"))?;
90        };
91        // Limit length and make decoder peekable
92        let len = decoder.packetlen()?;
93        let mut decoder = decoder.limit(len).peekable();
94        // Read fields
95        let packet_id = decoder.u16()?;
96        let topics_qos = decoder.topics_qos()?;
97
98        // Init self
99        Ok(Self { packet_id, topics_qos, _vec: PhantomData })
100    }
101}
102impl<Seq, Bytes> IntoIterator for Subscribe<Seq, Bytes>
103where
104    Seq: AnyVec<(Bytes, u8)>,
105    Bytes: AnyVec<u8>,
106{
107    type Item = u8;
108    #[rustfmt::skip]
109    type IntoIter =
110        // Complex iterator built out of the individual message fields
111        Chain<Chain<Chain<Chain<
112            // - header type and `2` flags
113            Unit, U8Iter>,
114            // - packet len
115            PacketLenIter>,
116            // - packet ID
117            U16Iter>,
118            // - sequence
119            //    - topic filter
120            //    - qos
121            TopicsQosIter<Seq, Bytes>>;
122
123    fn into_iter(self) -> Self::IntoIter {
124        // Precompute body length:
125        //  - packet ID
126        //  - sequence
127        //     - topic filter
128        //     - qos
129        #[rustfmt::skip]
130        let len = Length::new()
131            .u16(&self.packet_id)
132            .topics_qos(&self.topics_qos)
133            .into();
134
135        // Write packet:
136        //  - header type and `2` flags
137        //  - packet len
138        //  - packet ID
139        //  - sequence
140        //     - topic filter
141        //     - qos
142        Encoder::default()
143            .header(Self::TYPE, [false, false, true, false])
144            .packetlen(len)
145            .u16(self.packet_id)
146            .topics_qos(self.topics_qos)
147            .into_iter()
148    }
149}