mqtt_format/v3/
subscription_request.rs1use futures::{AsyncWrite, AsyncWriteExt};
8use nom::{multi::many1_count, Parser};
9use nom_supreme::ParserExt;
10
11use super::{
12 errors::MPacketWriteError,
13 qos::{mquality_of_service, MQualityOfService},
14 strings::{mstring, MString},
15 MSResult,
16};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub struct MSubscriptionRequests<'message> {
20 pub count: usize,
21 pub data: &'message [u8],
22}
23
24impl<'message> MSubscriptionRequests<'message> {
25 pub(crate) async fn write_to<W: AsyncWrite>(
26 &self,
27 writer: &mut std::pin::Pin<&mut W>,
28 ) -> Result<(), MPacketWriteError> {
29 writer.write_all(self.data).await?;
30 Ok(())
31 }
32 pub(crate) fn get_len(&self) -> usize {
33 self.data.len()
34 }
35}
36
37impl<'message> IntoIterator for MSubscriptionRequests<'message> {
38 type Item = MSubscriptionRequest<'message>;
39
40 type IntoIter = MSubscriptionIter<'message>;
41
42 fn into_iter(self) -> Self::IntoIter {
43 MSubscriptionIter {
44 count: self.count,
45 data: self.data,
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct MSubscriptionIter<'message> {
52 count: usize,
53 data: &'message [u8],
54}
55
56impl<'message> Iterator for MSubscriptionIter<'message> {
57 type Item = MSubscriptionRequest<'message>;
58
59 fn next(&mut self) -> Option<Self::Item> {
60 if self.count == 0 {
61 return None;
62 }
63
64 self.count -= 1;
65 match msubscriptionrequest(self.data) {
66 Ok((rest, request)) => {
67 self.data = rest;
68 Some(request)
69 }
70 Err(e) => {
71 unreachable!("Could not parse already validated sub request: {}", e)
72 }
73 }
74 }
75}
76
77pub fn msubscriptionrequests(input: &[u8]) -> MSResult<'_, MSubscriptionRequests<'_>> {
78 let data = input;
79 let (input, count) = many1_count(msubscriptionrequest)(input)?;
80
81 Ok((input, MSubscriptionRequests { count, data }))
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub struct MSubscriptionRequest<'message> {
86 pub topic: MString<'message>,
87 pub qos: MQualityOfService,
88}
89
90impl<'message> MSubscriptionRequest<'message> {
91 pub async fn write_to<W: AsyncWrite>(
92 &self,
93 writer: &mut std::pin::Pin<&mut W>,
94 ) -> Result<(), MPacketWriteError> {
95 MString::write_to(&self.topic, writer).await?;
96 self.qos.write_to(writer).await?;
97
98 Ok(())
99 }
100}
101
102pub fn msubscriptionrequest(input: &[u8]) -> MSResult<'_, MSubscriptionRequest<'_>> {
103 let (input, topic) = mstring(input)?;
104 let (input, qos) = nom::number::complete::u8
105 .map_res(mquality_of_service)
106 .parse(input)?;
107
108 Ok((input, MSubscriptionRequest { topic, qos }))
109}
110
111#[cfg(test)]
112mod tests {
113 use crate::v3::{strings::MString, subscription_request::MSubscriptionRequest};
114
115 use super::msubscriptionrequests;
116
117 #[test]
118 fn test_subscription_iterator() {
119 let input = &[
120 0, 3, 0x61, 0x2F, 0x62, 1, 0, 3, 0x63, 0x2F, 0x64, 2, ];
127
128 let (rest, subs) = msubscriptionrequests(input).unwrap();
129
130 assert_eq!(rest, &[]);
131
132 let mut sub_iter = subs.into_iter();
133
134 assert_eq!(
135 sub_iter.next(),
136 Some(MSubscriptionRequest {
137 qos: crate::v3::qos::MQualityOfService::AtLeastOnce,
138 topic: MString { value: "a/b" },
139 })
140 );
141
142 assert_eq!(
143 sub_iter.next(),
144 Some(MSubscriptionRequest {
145 qos: crate::v3::qos::MQualityOfService::ExactlyOnce,
146 topic: MString { value: "c/d" },
147 })
148 );
149
150 assert_eq!(sub_iter.next(), None,);
151 }
152}