mqttrust/encoding/v4/
subscribe.rs

1use core::marker::PhantomData;
2
3use super::{decoder::*, encoder::*, *};
4
5/// Subscribe topic.
6///
7/// [Subscribe] packets contain a `Vec` of those.
8///
9/// [Subscribe]: struct.Subscribe.html
10#[derive(Debug, Clone, PartialEq)]
11pub struct SubscribeTopic<'a> {
12    pub topic_path: &'a str,
13    pub qos: QoS,
14}
15
16impl<'a> FromBuffer<'a> for SubscribeTopic<'a> {
17    type Item = Self;
18
19    fn from_buffer(buf: &'a [u8], offset: &mut usize) -> Result<Self::Item, Error> {
20        let topic_path = read_str(buf, offset)?;
21        let qos = QoS::from_u8(buf[*offset])?;
22        *offset += 1;
23        Ok(SubscribeTopic { topic_path, qos })
24    }
25}
26
27impl<'a> FromBuffer<'a> for &'a str {
28    type Item = Self;
29
30    fn from_buffer(buf: &'a [u8], offset: &mut usize) -> Result<Self::Item, Error> {
31        read_str(buf, offset)
32    }
33}
34
35pub trait FromBuffer<'a> {
36    type Item;
37    fn from_buffer(buf: &'a [u8], offset: &mut usize) -> Result<Self::Item, Error>;
38}
39
40/// Subscribe return value.
41///
42/// [Suback] packets contain a `Vec` of those.
43///
44/// [Suback]: struct.Subscribe.html
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum SubscribeReturnCodes {
47    Success(QoS),
48    Failure,
49}
50
51impl<'a> FromBuffer<'a> for SubscribeReturnCodes {
52    type Item = Self;
53
54    fn from_buffer(buf: &'a [u8], offset: &mut usize) -> Result<Self::Item, Error> {
55        let code = buf[*offset];
56        *offset += 1;
57
58        if code == 0x80 {
59            Ok(SubscribeReturnCodes::Failure)
60        } else {
61            Ok(SubscribeReturnCodes::Success(QoS::from_u8(code)?))
62        }
63    }
64}
65
66impl SubscribeReturnCodes {
67    pub(crate) fn as_u8(&self) -> u8 {
68        match *self {
69            SubscribeReturnCodes::Failure => 0x80,
70            SubscribeReturnCodes::Success(qos) => qos.as_u8(),
71        }
72    }
73}
74
75#[derive(Debug, Clone, PartialEq)]
76pub enum List<'a, T> {
77    Owned(&'a [T]),
78    Lazy(LazyList<'a, T>),
79}
80
81impl<'a, T> List<'a, T>
82where
83    T: FromBuffer<'a, Item = T>,
84{
85    pub fn len(&self) -> usize {
86        match self {
87            List::Owned(data) => data.len(),
88            List::Lazy(data) => {
89                let mut len = 0;
90                let mut offset = 0;
91                while T::from_buffer(data.0, &mut offset).is_ok() {
92                    len += 1;
93                }
94                len
95            }
96        }
97    }
98}
99
100impl<'a, T> IntoIterator for &'a List<'a, T>
101where
102    T: FromBuffer<'a, Item = T> + Clone,
103{
104    type Item = T;
105
106    type IntoIter = ListIter<'a, T>;
107
108    fn into_iter(self) -> Self::IntoIter {
109        ListIter {
110            list: self,
111            index: 0,
112        }
113    }
114}
115
116#[derive(Debug, Clone, PartialEq)]
117pub struct LazyList<'a, T>(&'a [u8], PhantomData<T>);
118
119pub struct ListIter<'a, T> {
120    list: &'a List<'a, T>,
121    index: usize,
122}
123
124impl<'a, T> Iterator for ListIter<'a, T>
125where
126    T: FromBuffer<'a, Item = T> + Clone,
127{
128    type Item = T;
129
130    fn next(&mut self) -> Option<Self::Item> {
131        match self.list {
132            List::Owned(data) => {
133                // FIXME: Can we get rid of this clone?
134                let item = data.get(self.index).cloned();
135                self.index += 1;
136                item
137            }
138            List::Lazy(data) => T::from_buffer(data.0, &mut self.index).ok(),
139        }
140    }
141}
142
143/// Subscribe packet ([MQTT 3.8]).
144///
145/// [MQTT 3.8]: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063
146#[derive(Debug, Clone, PartialEq)]
147pub struct Subscribe<'a> {
148    pid: Option<Pid>,
149    topics: List<'a, SubscribeTopic<'a>>,
150}
151
152/// Subsack packet ([MQTT 3.9]).
153///
154/// [MQTT 3.9]: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068
155#[derive(Debug, Clone, PartialEq)]
156pub struct Suback<'a> {
157    pub pid: Pid,
158    pub return_codes: &'a [SubscribeReturnCodes],
159}
160
161/// Unsubscribe packet ([MQTT 3.10]).
162///
163/// [MQTT 3.10]: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072
164#[derive(Debug, Clone, PartialEq)]
165pub struct Unsubscribe<'a> {
166    pub pid: Option<Pid>,
167    pub topics: List<'a, &'a str>,
168}
169
170impl<'a> Subscribe<'a> {
171    pub fn new(topics: &'a [SubscribeTopic<'a>]) -> Self {
172        Self {
173            pid: None,
174            topics: List::Owned(topics),
175        }
176    }
177
178    pub fn topics(&self) -> impl Iterator<Item = SubscribeTopic<'_>> {
179        self.topics.into_iter()
180    }
181
182    pub fn pid(&self) -> Option<Pid> {
183        self.pid
184    }
185
186    pub(crate) fn from_buffer(
187        remaining_len: usize,
188        buf: &'a [u8],
189        offset: &mut usize,
190    ) -> Result<Self, Error> {
191        let payload_end = *offset + remaining_len;
192        let pid = Pid::from_buffer(buf, offset)?;
193
194        Ok(Subscribe {
195            pid: Some(pid),
196            topics: List::Lazy(LazyList(&buf[*offset..payload_end], PhantomData)),
197        })
198    }
199
200    /// Length: pid(2) + topic.for_each(2+len + qos(1))
201    pub(crate) fn len(&self) -> usize {
202        let mut length = 2;
203        for topic in self.topics() {
204            length += topic.topic_path.len() + 2 + 1;
205        }
206        length
207    }
208
209    pub(crate) fn to_buffer(&self, buf: &mut [u8], offset: &mut usize) -> Result<usize, Error> {
210        let header: u8 = 0b10000010;
211        check_remaining(buf, offset, 1)?;
212        write_u8(buf, offset, header)?;
213
214        let write_len = write_length(buf, offset, self.len())? + 1;
215
216        // Pid
217        self.pid.unwrap_or_default().to_buffer(buf, offset)?;
218
219        // Topics
220        for topic in self.topics() {
221            write_string(buf, offset, topic.topic_path)?;
222            write_u8(buf, offset, topic.qos.as_u8())?;
223        }
224
225        Ok(write_len)
226    }
227}
228
229impl<'a> Unsubscribe<'a> {
230    pub fn new(topics: &'a [&'a str]) -> Self {
231        Self {
232            pid: None,
233            topics: List::Owned(topics),
234        }
235    }
236
237    pub fn topics(&self) -> impl Iterator<Item = &str> {
238        self.topics.into_iter()
239    }
240
241    pub fn pid(&self) -> Option<Pid> {
242        self.pid
243    }
244
245    pub(crate) fn from_buffer(
246        remaining_len: usize,
247        buf: &'a [u8],
248        offset: &mut usize,
249    ) -> Result<Self, Error> {
250        let payload_end = *offset + remaining_len;
251        let pid = Pid::from_buffer(buf, offset)?;
252
253        Ok(Unsubscribe {
254            pid: Some(pid),
255            topics: List::Lazy(LazyList(&buf[*offset..payload_end], PhantomData)),
256        })
257    }
258
259    /// Length: pid(2) + topic.for_each(2+len)
260    pub(crate) fn len(&self) -> usize {
261        let mut length = 2;
262        for topic in self.topics() {
263            length += 2 + topic.len();
264        }
265        length
266    }
267
268    pub(crate) fn to_buffer(&self, buf: &mut [u8], offset: &mut usize) -> Result<usize, Error> {
269        let header: u8 = 0b10100010;
270
271        check_remaining(buf, offset, 1)?;
272        write_u8(buf, offset, header)?;
273
274        let write_len = write_length(buf, offset, self.len())? + 1;
275
276        // Pid
277        self.pid.unwrap_or_default().to_buffer(buf, offset)?;
278
279        for topic in self.topics() {
280            write_string(buf, offset, topic)?;
281        }
282        Ok(write_len)
283    }
284}
285
286impl<'a> Suback<'a> {
287    pub(crate) fn from_buffer(
288        _remaining_len: usize,
289        buf: &'a [u8],
290        offset: &mut usize,
291    ) -> Result<Self, Error> {
292        // FIXME:
293        // let payload_end = *offset + remaining_len;
294        let pid = Pid::from_buffer(buf, offset)?;
295
296        // let mut return_codes = LimitedVec::new();
297        // while *offset < payload_end {
298        //     let _res = return_codes.push(SubscribeReturnCodes::from_buffer(buf, offset)?);
299        // }
300
301        Ok(Suback {
302            pid,
303            return_codes: &[],
304        })
305    }
306
307    pub(crate) fn to_buffer(&self, buf: &mut [u8], offset: &mut usize) -> Result<usize, Error> {
308        let header: u8 = 0b10010000;
309        let length = 2 + self.return_codes.len();
310        check_remaining(buf, offset, 1)?;
311        write_u8(buf, offset, header)?;
312
313        let write_len = write_length(buf, offset, length)? + 1;
314        self.pid.to_buffer(buf, offset)?;
315        for rc in self.return_codes {
316            write_u8(buf, offset, rc.as_u8())?;
317        }
318        Ok(write_len)
319    }
320}