mqtt_format/v3/
subscription_request.rs

1//
2//   This Source Code Form is subject to the terms of the Mozilla Public
3//   License, v. 2.0. If a copy of the MPL was not distributed with this
4//   file, You can obtain one at http://mozilla.org/MPL/2.0/.
5//
6
7use futures::{AsyncWrite, AsyncWriteExt};
8use nom::{multi::many1_count, Parser};
9use nom_supreme::ParserExt;
10
11use super::{
12    errors::MPacketWriteError,
13    qos::{mquality_of_service, MQualityOfService},
14    strings::{mstring, MString},
15    MSResult,
16};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub struct MSubscriptionRequests<'message> {
20    pub count: usize,
21    pub data: &'message [u8],
22}
23
24impl<'message> MSubscriptionRequests<'message> {
25    pub(crate) async fn write_to<W: AsyncWrite>(
26        &self,
27        writer: &mut std::pin::Pin<&mut W>,
28    ) -> Result<(), MPacketWriteError> {
29        writer.write_all(self.data).await?;
30        Ok(())
31    }
32    pub(crate) fn get_len(&self) -> usize {
33        self.data.len()
34    }
35}
36
37impl<'message> IntoIterator for MSubscriptionRequests<'message> {
38    type Item = MSubscriptionRequest<'message>;
39
40    type IntoIter = MSubscriptionIter<'message>;
41
42    fn into_iter(self) -> Self::IntoIter {
43        MSubscriptionIter {
44            count: self.count,
45            data: self.data,
46        }
47    }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct MSubscriptionIter<'message> {
52    count: usize,
53    data: &'message [u8],
54}
55
56impl<'message> Iterator for MSubscriptionIter<'message> {
57    type Item = MSubscriptionRequest<'message>;
58
59    fn next(&mut self) -> Option<Self::Item> {
60        if self.count == 0 {
61            return None;
62        }
63
64        self.count -= 1;
65        match msubscriptionrequest(self.data) {
66            Ok((rest, request)) => {
67                self.data = rest;
68                Some(request)
69            }
70            Err(e) => {
71                unreachable!("Could not parse already validated sub request: {}", e)
72            }
73        }
74    }
75}
76
77pub fn msubscriptionrequests(input: &[u8]) -> MSResult<'_, MSubscriptionRequests<'_>> {
78    let data = input;
79    let (input, count) = many1_count(msubscriptionrequest)(input)?;
80
81    Ok((input, MSubscriptionRequests { count, data }))
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub struct MSubscriptionRequest<'message> {
86    pub topic: MString<'message>,
87    pub qos: MQualityOfService,
88}
89
90impl<'message> MSubscriptionRequest<'message> {
91    pub async fn write_to<W: AsyncWrite>(
92        &self,
93        writer: &mut std::pin::Pin<&mut W>,
94    ) -> Result<(), MPacketWriteError> {
95        MString::write_to(&self.topic, writer).await?;
96        self.qos.write_to(writer).await?;
97
98        Ok(())
99    }
100}
101
102pub fn msubscriptionrequest(input: &[u8]) -> MSResult<'_, MSubscriptionRequest<'_>> {
103    let (input, topic) = mstring(input)?;
104    let (input, qos) = nom::number::complete::u8
105        .map_res(mquality_of_service)
106        .parse(input)?;
107
108    Ok((input, MSubscriptionRequest { topic, qos }))
109}
110
111#[cfg(test)]
112mod tests {
113    use crate::v3::{strings::MString, subscription_request::MSubscriptionRequest};
114
115    use super::msubscriptionrequests;
116
117    #[test]
118    fn test_subscription_iterator() {
119        let input = &[
120            0, 3, // Length 3
121            0x61, 0x2F, 0x62, // The string 'a/b'
122            1,    // QoS 1
123            0, 3, // Length 3
124            0x63, 0x2F, 0x64, // The string 'c/d'
125            2,    // QoS 2
126        ];
127
128        let (rest, subs) = msubscriptionrequests(input).unwrap();
129
130        assert_eq!(rest, &[]);
131
132        let mut sub_iter = subs.into_iter();
133
134        assert_eq!(
135            sub_iter.next(),
136            Some(MSubscriptionRequest {
137                qos: crate::v3::qos::MQualityOfService::AtLeastOnce,
138                topic: MString { value: "a/b" },
139            })
140        );
141
142        assert_eq!(
143            sub_iter.next(),
144            Some(MSubscriptionRequest {
145                qos: crate::v3::qos::MQualityOfService::ExactlyOnce,
146                topic: MString { value: "c/d" },
147            })
148        );
149
150        assert_eq!(sub_iter.next(), None,);
151    }
152}