Skip to main content

embedded_mqtt/payload/
subscribe.rs

1use core::{
2	fmt,
3	result::Result,
4	iter::Iterator,
5	convert::{From, TryFrom},
6};
7
8use crate::{
9	codec::{self, Decodable, Encodable},
10	error::{DecodeError, EncodeError},
11	qos,
12	status::Status,
13};
14
15pub struct Iter<'a> {
16	offset: usize,
17	sub: &'a Subscribe<'a>,
18}
19
20impl<'a> Iter<'a> {
21	fn new(sub: &'a Subscribe<'a>) -> Self {
22		Iter {
23			offset: 0,
24			sub,
25		}
26	}
27}
28
29impl<'a> Iterator for Iter<'a> {
30	type Item = (&'a str, qos::QoS);
31	fn next(&mut self) -> Option<Self::Item> {
32		match self.sub {
33			&Subscribe::Encode(topics) => {
34				// Offset is an index into the encode slice
35				if self.offset >= topics.len() {
36					return None
37				}
38
39				let item = topics[self.offset];
40				self.offset += 1;
41
42				Some(item)
43			},
44			&Subscribe::Decode(bytes) => {
45				// Offset is a byte offset in the byte slice
46				if self.offset >= bytes.len() {
47					return None
48				}
49
50				// &bytes[offset..] points to a length, string and QoS
51				let (o, item) = parse_subscription(&bytes[self.offset..]).expect("already validated").unwrap();
52				self.offset += o;
53
54				Some(item)
55			}
56		}
57	}
58}
59
60pub enum Subscribe<'a> {
61	Encode(&'a [(&'a str, qos::QoS)]),
62	Decode(&'a [u8]),
63}
64
65impl<'a> Subscribe<'a> {
66	pub fn new(topics: &'a [(&'a str, qos::QoS)]) -> Self {
67		Subscribe::Encode(topics)
68	}
69
70	pub fn topics(&self) -> Iter {
71		Iter::new(self)
72	}
73}
74
75impl<'a> fmt::Debug for Subscribe<'a> {
76	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
77		write!(f, "Subscribe {{\n")?;
78		self.topics()
79			.fold(Ok(()), |acc, (topic, qos)| {
80				acc?;
81				write!(f, "    (\n        Topic: {:#?},\n        QoS: {:#?}\n    )\n", topic, qos)
82			})?;
83		write!(f, "}}")?;
84
85		Ok(())
86	}
87}
88
89fn parse_subscription<'a>(bytes: &'a [u8]) -> Result<Status<(usize, (&'a str, qos::QoS))>, DecodeError> {
90	let offset = 0;
91
92	let (offset, topic) = {
93		let (o, topic) = complete!(codec::string::parse_string(&bytes[offset..]));
94		(offset + o, topic)
95	};
96
97	let (offset, qos) = {
98		let (o, qos) = complete!(codec::values::parse_u8(&bytes[offset..]));
99		let qos = qos::QoS::try_from(qos)?;
100		(offset + o, qos)
101	};
102
103	Ok(Status::Complete((offset, (topic, qos))))
104}
105
106impl<'a> Decodable<'a> for Subscribe<'a> {
107	fn decode(bytes: &'a [u8]) -> Result<Status<(usize, Self)>, DecodeError> {
108		let mut offset = 0;
109		while offset < bytes.len() {
110			let o = match parse_subscription(&bytes[offset..]) {
111				Err(e) => return Err(e),
112				Ok(Status::Partial(..)) => return Err(DecodeError::InvalidLength),
113				Ok(Status::Complete((o, _))) => o,
114			};
115			offset += o;
116		}
117
118		Ok(Status::Complete((bytes.len(), Subscribe::Decode(bytes))))
119	}
120}
121
122impl<'a> Encodable for Subscribe<'a> {
123	fn encoded_len(&self) -> usize {
124		self.topics()
125			.map(|topic| {
126				topic.0.encoded_len() + 1
127			})
128			.sum()
129	}
130
131	fn encode(&self, bytes: &mut [u8]) -> Result<usize, EncodeError> {
132		self.topics()
133			.fold(Ok(0), |acc, (topic, qos)| {
134				let offset = acc?;
135				let offset = {
136					let o = codec::string::encode_string(topic, &mut bytes[offset..])?;
137					offset + o
138				};
139				let offset = {
140					let o = codec::values::encode_u8(u8::from(qos), &mut bytes[offset..])?;
141					offset + o
142				};
143				Ok(offset)
144			})
145	}
146}
147
148#[cfg(test)]
149mod tests {
150	use super::*;
151
152	#[test]
153	fn decode_literal() {
154		let topics = [
155			("a", qos::QoS::AtMostOnce),
156			("b", qos::QoS::AtLeastOnce),
157			("c", qos::QoS::ExactlyOnce),
158		];
159
160		let sub = Subscribe::new(&topics);
161
162		let mut iter = sub.topics();
163
164		let next = iter.next();
165		assert_eq!(next, Some(("a", qos::QoS::AtMostOnce)));
166
167		let next = iter.next();
168		assert_eq!(next, Some(("b", qos::QoS::AtLeastOnce)));
169
170		let next = iter.next();
171		assert_eq!(next, Some(("c", qos::QoS::ExactlyOnce)));
172
173		let next = iter.next();
174		assert_eq!(next, None);
175	}
176
177	#[test]
178	fn decode_bytes() {
179		let bytes = [
180			0b0000_0000, // 1
181			0b0000_0001,
182			0x61,        // 'a'
183			0x0000_0000, // AtMostOnce
184
185			0b0000_0000, // 1
186			0b0000_0001,
187			0x62,        // 'b'
188			0b0000_0001, // AtLeastOnce
189
190			0b0000_0000, // 1
191			0b0000_0001,
192			0x63,        // 'c'
193			0b0000_0010, // ExactlyOnce
194		];
195
196		let (_, sub) = Subscribe::decode(&bytes).expect("valid").unwrap();
197
198		let mut iter = sub.topics();
199
200		let next = iter.next();
201		assert_eq!(next, Some(("a", qos::QoS::AtMostOnce)));
202
203		let next = iter.next();
204		assert_eq!(next, Some(("b", qos::QoS::AtLeastOnce)));
205
206		let next = iter.next();
207		assert_eq!(next, Some(("c", qos::QoS::ExactlyOnce)));
208
209		let next = iter.next();
210		assert_eq!(next, None);
211	}
212
213	#[test]
214	fn decode_bytes_error() {
215		let bytes = [
216			0b0000_0000, // 1
217			0b0000_0001,
218			0x61,        // 'a'
219			0x0000_0000, // AtMostOnce
220
221			0b0000_0000, // 1
222			0b0000_0001,
223			0x62,        // 'b'
224			0b0000_0001, // AtLeastOnce
225
226			0b0000_0000, // 1
227			0b0000_0001,
228			0x63,        // 'c'
229
230			// Intentionally omitted
231			//0b0000_0010, // ExactlyOnce
232			//
233		];
234
235		let sub = Subscribe::decode(&bytes);
236		assert!(sub.is_err());
237		assert_eq!(sub.unwrap_err(), DecodeError::InvalidLength);
238	}
239}