1use core::convert::TryFrom;
2
3use alloc::string::String;
4use alloc::sync::Arc;
5use alloc::vec::Vec;
6
7use crate::{
8 decode_var_int, read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, AsyncRead,
9 Encodable, Error, Pid, QoS, SyncWrite, TopicFilter,
10};
11
12use super::{
13 decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
14 PropertyId, PropertyValue, UserProperty, VarByteInt,
15};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20pub struct Subscribe {
21 pub pid: Pid,
22 pub properties: SubscribeProperties,
23 pub topics: Vec<(TopicFilter, SubscriptionOptions)>,
24}
25
26impl Subscribe {
27 pub fn new(pid: Pid, topics: Vec<(TopicFilter, SubscriptionOptions)>) -> Self {
28 Subscribe {
29 pid,
30 properties: SubscribeProperties::default(),
31 topics,
32 }
33 }
34
35 pub async fn decode_async<T: AsyncRead + Unpin>(
36 reader: &mut T,
37 header: Header,
38 ) -> Result<Self, ErrorV5> {
39 let mut remaining_len = header.remaining_len as usize;
40 let pid = Pid::try_from(read_u16(reader).await?)?;
41 let properties = SubscribeProperties::decode_async(reader, header.typ).await?;
42 remaining_len = remaining_len
43 .checked_sub(2 + properties.encode_len())
44 .ok_or(Error::InvalidRemainingLength)?;
45 if remaining_len == 0 {
46 return Err(Error::EmptySubscription.into());
47 }
48 let mut topics = Vec::new();
49 while remaining_len > 0 {
50 let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
51 let options = {
52 let opt_byte = read_u8(reader).await?;
53 if opt_byte & 0b11000000 > 0 {
54 return Err(ErrorV5::InvalidSubscriptionOption(opt_byte));
55 }
56 let max_qos = QoS::from_u8(opt_byte & 0b11)
57 .map_err(|_| ErrorV5::InvalidSubscriptionOption(opt_byte))?;
58 let no_local = opt_byte & 0b100 == 0b100;
59 let retain_as_published = opt_byte & 0b1000 == 0b1000;
60 let retain_handling = RetainHandling::from_u8((opt_byte & 0b110000) >> 4)
61 .ok_or(ErrorV5::InvalidSubscriptionOption(opt_byte))?;
62 SubscriptionOptions {
63 max_qos,
64 no_local,
65 retain_as_published,
66 retain_handling,
67 }
68 };
69 remaining_len = remaining_len
70 .checked_sub(3 + topic_filter.len())
71 .ok_or(Error::InvalidRemainingLength)?;
72 topics.push((topic_filter, options));
73 }
74 Ok(Subscribe {
75 pid,
76 properties,
77 topics,
78 })
79 }
80}
81
82impl Encodable for Subscribe {
83 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
84 write_u16(writer, self.pid.value())?;
85 self.properties.encode(writer)?;
86 for (topic_filter, options) in &self.topics {
87 write_bytes(writer, topic_filter.as_bytes())?;
88 write_u8(writer, options.to_u8())?;
89 }
90 Ok(())
91 }
92
93 fn encode_len(&self) -> usize {
94 2 + self.properties.encode_len()
95 + self
96 .topics
97 .iter()
98 .map(|(filter, _)| 3 + filter.len())
99 .sum::<usize>()
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
105#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
106pub struct SubscribeProperties {
107 pub subscription_id: Option<VarByteInt>,
108 pub user_properties: Vec<UserProperty>,
109}
110
111impl SubscribeProperties {
112 pub async fn decode_async<T: AsyncRead + Unpin>(
113 reader: &mut T,
114 packet_type: PacketType,
115 ) -> Result<Self, ErrorV5> {
116 let mut properties = SubscribeProperties::default();
117 decode_properties!(packet_type, properties, reader, SubscriptionIdentifier,);
118 Ok(properties)
119 }
120}
121
122impl Encodable for SubscribeProperties {
123 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
124 encode_properties!(self, writer, SubscriptionIdentifier,);
125 Ok(())
126 }
127 fn encode_len(&self) -> usize {
128 let mut len = 0;
129 encode_properties_len!(self, len, SubscriptionIdentifier,);
130 len
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
136#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
137pub struct SubscriptionOptions {
138 pub max_qos: QoS,
139 pub no_local: bool,
140 pub retain_as_published: bool,
141 pub retain_handling: RetainHandling,
142}
143
144impl SubscriptionOptions {
145 pub fn new(max_qos: QoS) -> Self {
146 SubscriptionOptions {
147 max_qos,
148 no_local: false,
149 retain_as_published: true,
150 retain_handling: RetainHandling::SendAtSubscribe,
151 }
152 }
153
154 pub fn to_u8(&self) -> u8 {
155 let mut byte = self.max_qos as u8;
156 if self.no_local {
157 byte |= 0b100;
158 }
159 if self.retain_as_published {
160 byte |= 0b1000;
161 }
162 byte |= (self.retain_handling as u8) << 4;
163 byte
164 }
165}
166
167#[repr(u8)]
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
170#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
171pub enum RetainHandling {
172 SendAtSubscribe = 0,
173 SendAtSubscribeIfNotExist = 1,
174 DoNotSend = 2,
175}
176
177impl RetainHandling {
178 pub fn from_u8(value: u8) -> Option<Self> {
179 let opt = match value {
180 0 => Self::SendAtSubscribe,
181 1 => Self::SendAtSubscribeIfNotExist,
182 2 => Self::DoNotSend,
183 _ => return None,
184 };
185 Some(opt)
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
191#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
192pub struct Suback {
193 pub pid: Pid,
194 pub properties: SubackProperties,
195 pub topics: Vec<SubscribeReasonCode>,
196}
197
198impl Suback {
199 pub fn new(pid: Pid, topics: Vec<SubscribeReasonCode>) -> Self {
200 Suback {
201 pid,
202 properties: SubackProperties::default(),
203 topics,
204 }
205 }
206
207 pub async fn decode_async<T: AsyncRead + Unpin>(
208 reader: &mut T,
209 header: Header,
210 ) -> Result<Self, ErrorV5> {
211 let mut remaining_len = header.remaining_len as usize;
212 let pid = Pid::try_from(read_u16(reader).await?)?;
213 let properties = SubackProperties::decode_async(reader, header.typ).await?;
214 remaining_len = remaining_len
215 .checked_sub(2 + properties.encode_len())
216 .ok_or(Error::InvalidRemainingLength)?;
217 let mut topics = Vec::new();
218 while remaining_len > 0 {
219 let value = read_u8(reader).await?;
220 let code = SubscribeReasonCode::from_u8(value)
221 .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
222 topics.push(code);
223 remaining_len -= 1;
224 }
225 Ok(Suback {
226 pid,
227 properties,
228 topics,
229 })
230 }
231}
232
233impl Encodable for Suback {
234 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
235 write_u16(writer, self.pid.value())?;
236 self.properties.encode(writer)?;
237 for reason_code in &self.topics {
238 write_u8(writer, *reason_code as u8)?;
239 }
240 Ok(())
241 }
242
243 fn encode_len(&self) -> usize {
244 2 + self.properties.encode_len() + self.topics.len()
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Default)]
250#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
251pub struct SubackProperties {
252 pub reason_string: Option<Arc<String>>,
253 pub user_properties: Vec<UserProperty>,
254}
255
256impl SubackProperties {
257 pub async fn decode_async<T: AsyncRead + Unpin>(
258 reader: &mut T,
259 packet_type: PacketType,
260 ) -> Result<Self, ErrorV5> {
261 let mut properties = SubackProperties::default();
262 decode_properties!(packet_type, properties, reader, ReasonString,);
263 Ok(properties)
264 }
265}
266
267impl Encodable for SubackProperties {
268 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
269 encode_properties!(self, writer, ReasonString,);
270 Ok(())
271 }
272 fn encode_len(&self) -> usize {
273 let mut len = 0;
274 encode_properties_len!(self, len, ReasonString,);
275 len
276 }
277}
278
279#[repr(u8)]
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
298#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
299pub enum SubscribeReasonCode {
300 GrantedQoS0 = 0x00,
301 GrantedQoS1 = 0x01,
302 GrantedQoS2 = 0x02,
303 UnspecifiedError = 0x80,
304 ImplementationSpecificError = 0x83,
305 NotAuthorized = 0x87,
306 TopicFilterInvalid = 0x8F,
307 PacketIdentifierInUse = 0x91,
308 QuotaExceeded = 0x97,
309 SharedSubscriptionNotSupported = 0x9E,
310 SubscriptionIdentifiersNotSupported = 0xA1,
311 WildcardSubscriptionsNotSupported = 0xA2,
312}
313
314impl SubscribeReasonCode {
315 pub fn from_u8(value: u8) -> Option<Self> {
316 let code = match value {
317 0x00 => Self::GrantedQoS0,
318 0x01 => Self::GrantedQoS1,
319 0x02 => Self::GrantedQoS2,
320 0x80 => Self::UnspecifiedError,
321 0x83 => Self::ImplementationSpecificError,
322 0x87 => Self::NotAuthorized,
323 0x8F => Self::TopicFilterInvalid,
324 0x91 => Self::PacketIdentifierInUse,
325 0x97 => Self::QuotaExceeded,
326 0x9E => Self::SharedSubscriptionNotSupported,
327 0xA1 => Self::SubscriptionIdentifiersNotSupported,
328 0xA2 => Self::WildcardSubscriptionsNotSupported,
329 _ => return None,
330 };
331 Some(code)
332 }
333}
334
335#[derive(Debug, Clone, PartialEq, Eq, Hash)]
337#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
338pub struct Unsubscribe {
339 pub pid: Pid,
340 pub properties: UnsubscribeProperties,
341 pub topics: Vec<TopicFilter>,
342}
343
344impl Unsubscribe {
345 pub fn new(pid: Pid, topics: Vec<TopicFilter>) -> Self {
346 Unsubscribe {
347 pid,
348 properties: Default::default(),
349 topics,
350 }
351 }
352
353 pub async fn decode_async<T: AsyncRead + Unpin>(
354 reader: &mut T,
355 header: Header,
356 ) -> Result<Self, ErrorV5> {
357 let mut remaining_len = header.remaining_len as usize;
358 let pid = Pid::try_from(read_u16(reader).await?)?;
359 let (property_len, property_len_bytes) = decode_var_int(reader).await?;
360 let mut properties = UnsubscribeProperties::default();
361 let mut len = 0;
362 while property_len as usize > len {
363 let property_id = PropertyId::from_u8(read_u8(reader).await?)?;
364 match property_id {
365 PropertyId::UserProperty => {
366 let property = PropertyValue::decode_user_property(reader).await?;
367 len += 1 + 4 + property.name.len() + property.value.len();
368 properties.user_properties.push(property);
369 }
370 _ => return Err(ErrorV5::InvalidProperty(header.typ, property_id)),
371 }
372 }
373 if property_len as usize != len {
374 return Err(ErrorV5::InvalidPropertyLength(property_len));
375 }
376 remaining_len = remaining_len
377 .checked_sub(2 + property_len_bytes + len)
378 .ok_or(Error::InvalidRemainingLength)?;
379 if remaining_len == 0 {
380 return Err(Error::EmptySubscription.into());
381 }
382 let mut topics = Vec::new();
383 while remaining_len > 0 {
384 let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
385 remaining_len = remaining_len
386 .checked_sub(2 + topic_filter.len())
387 .ok_or(Error::InvalidRemainingLength)?;
388 topics.push(topic_filter);
389 }
390 Ok(Unsubscribe {
391 pid,
392 properties,
393 topics,
394 })
395 }
396}
397
398impl Encodable for Unsubscribe {
399 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
400 write_u16(writer, self.pid.value())?;
401 self.properties.encode(writer)?;
402 for topic_filter in &self.topics {
403 write_bytes(writer, topic_filter.as_bytes())?;
404 }
405 Ok(())
406 }
407
408 fn encode_len(&self) -> usize {
409 let mut len = 2;
410 len += self.properties.encode_len();
411 len += self
412 .topics
413 .iter()
414 .map(|topic_filter| 2 + topic_filter.len())
415 .sum::<usize>();
416 len
417 }
418}
419
420#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
422#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
423pub struct UnsubscribeProperties {
424 pub user_properties: Vec<UserProperty>,
425}
426
427impl UnsubscribeProperties {
428 pub async fn decode_async<T: AsyncRead + Unpin>(
429 reader: &mut T,
430 packet_type: PacketType,
431 ) -> Result<Self, ErrorV5> {
432 let mut properties = UnsubscribeProperties::default();
433 decode_properties!(packet_type, properties, reader,);
434 Ok(properties)
435 }
436}
437
438impl Encodable for UnsubscribeProperties {
439 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
440 encode_properties!(self, writer);
441 Ok(())
442 }
443 fn encode_len(&self) -> usize {
444 let mut len = 0;
445 encode_properties_len!(self, len);
446 len
447 }
448}
449
450impl From<Vec<UserProperty>> for UnsubscribeProperties {
451 fn from(user_properties: Vec<UserProperty>) -> UnsubscribeProperties {
452 UnsubscribeProperties { user_properties }
453 }
454}
455
456#[derive(Debug, Clone, PartialEq, Eq)]
458#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
459pub struct Unsuback {
460 pub pid: Pid,
461 pub properties: UnsubackProperties,
462 pub topics: Vec<UnsubscribeReasonCode>,
463}
464
465impl Unsuback {
466 pub fn new(pid: Pid, topics: Vec<UnsubscribeReasonCode>) -> Self {
467 Unsuback {
468 pid,
469 properties: UnsubackProperties::default(),
470 topics,
471 }
472 }
473
474 pub async fn decode_async<T: AsyncRead + Unpin>(
475 reader: &mut T,
476 header: Header,
477 ) -> Result<Self, ErrorV5> {
478 let mut remaining_len = header.remaining_len as usize;
479 let pid = Pid::try_from(read_u16(reader).await?)?;
480 let properties = UnsubackProperties::decode_async(reader, header.typ).await?;
481 remaining_len = remaining_len
482 .checked_sub(2 + properties.encode_len())
483 .ok_or(Error::InvalidRemainingLength)?;
484 let mut topics = Vec::new();
485 while remaining_len > 0 {
486 let value = read_u8(reader).await?;
487 let code = UnsubscribeReasonCode::from_u8(value)
488 .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
489 topics.push(code);
490 remaining_len -= 1;
491 }
492 Ok(Unsuback {
493 pid,
494 properties,
495 topics,
496 })
497 }
498}
499
500impl Encodable for Unsuback {
501 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
502 write_u16(writer, self.pid.value())?;
503 self.properties.encode(writer)?;
504 for reason_code in &self.topics {
505 write_u8(writer, *reason_code as u8)?;
506 }
507 Ok(())
508 }
509
510 fn encode_len(&self) -> usize {
511 2 + self.properties.encode_len() + self.topics.len()
512 }
513}
514
515#[derive(Debug, Clone, PartialEq, Eq, Default)]
517#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
518pub struct UnsubackProperties {
519 pub reason_string: Option<Arc<String>>,
520 pub user_properties: Vec<UserProperty>,
521}
522
523impl UnsubackProperties {
524 pub async fn decode_async<T: AsyncRead + Unpin>(
525 reader: &mut T,
526 packet_type: PacketType,
527 ) -> Result<Self, ErrorV5> {
528 let mut properties = UnsubackProperties::default();
529 decode_properties!(packet_type, properties, reader, ReasonString,);
530 Ok(properties)
531 }
532}
533
534impl Encodable for UnsubackProperties {
535 fn encode<W: SyncWrite>(&self, writer: &mut W) -> Result<(), Error> {
536 encode_properties!(self, writer, ReasonString,);
537 Ok(())
538 }
539 fn encode_len(&self) -> usize {
540 let mut len = 0;
541 encode_properties_len!(self, len, ReasonString,);
542 len
543 }
544}
545
546#[repr(u8)]
559#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
560#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
561pub enum UnsubscribeReasonCode {
562 Success = 0x00,
563 NoSubscriptionExisted = 0x11,
564 UnspecifiedError = 0x80,
565 ImplementationSpecificError = 0x83,
566 NotAuthorized = 0x87,
567 TopicFilterInvalid = 0x8F,
568 PacketIdentifierInUse = 0x91,
569}
570
571impl UnsubscribeReasonCode {
572 pub fn from_u8(value: u8) -> Option<Self> {
573 let code = match value {
574 0x00 => Self::Success,
575 0x11 => Self::NoSubscriptionExisted,
576 0x80 => Self::UnspecifiedError,
577 0x83 => Self::ImplementationSpecificError,
578 0x87 => Self::NotAuthorized,
579 0x8F => Self::TopicFilterInvalid,
580 0x91 => Self::PacketIdentifierInUse,
581 _ => return None,
582 };
583 Some(code)
584 }
585}