1use std::convert::TryFrom;
2use std::io;
3use std::sync::Arc;
4
5use tokio::io::AsyncRead;
6
7use super::{
8 decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
9 PropertyId, PropertyValue, UserProperty, VarByteInt,
10};
11use crate::{
12 decode_var_int, read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, Encodable,
13 Error, Pid, QoS, TopicFilter,
14};
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
19pub struct Subscribe {
20 pub pid: Pid,
21 pub properties: SubscribeProperties,
22 pub topics: Vec<(TopicFilter, SubscriptionOptions)>,
23}
24
25impl Subscribe {
26 pub fn new(pid: Pid, topics: Vec<(TopicFilter, SubscriptionOptions)>) -> Self {
27 Subscribe {
28 pid,
29 properties: SubscribeProperties::default(),
30 topics,
31 }
32 }
33
34 pub async fn decode_async<T: AsyncRead + Unpin>(
35 reader: &mut T,
36 header: Header,
37 ) -> Result<Self, ErrorV5> {
38 let mut remaining_len = header.remaining_len as usize;
39 let pid = Pid::try_from(read_u16(reader).await?)?;
40 let properties = SubscribeProperties::decode_async(reader, header.typ).await?;
41 remaining_len = remaining_len
42 .checked_sub(2 + properties.encode_len())
43 .ok_or(Error::InvalidRemainingLength)?;
44 if remaining_len == 0 {
45 return Err(Error::EmptySubscription.into());
46 }
47 let mut topics = Vec::new();
48 while remaining_len > 0 {
49 let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
50 let options = {
51 let opt_byte = read_u8(reader).await?;
52 if opt_byte & 0b11000000 > 0 {
53 return Err(ErrorV5::InvalidSubscriptionOption(opt_byte));
54 }
55 let max_qos = QoS::from_u8(opt_byte & 0b11)
56 .map_err(|_| ErrorV5::InvalidSubscriptionOption(opt_byte))?;
57 let no_local = opt_byte & 0b100 == 0b100;
58 let retain_as_published = opt_byte & 0b1000 == 0b1000;
59 let retain_handling = RetainHandling::from_u8((opt_byte & 0b110000) >> 4)
60 .ok_or(ErrorV5::InvalidSubscriptionOption(opt_byte))?;
61 SubscriptionOptions {
62 max_qos,
63 no_local,
64 retain_as_published,
65 retain_handling,
66 }
67 };
68 remaining_len = remaining_len
69 .checked_sub(3 + topic_filter.len())
70 .ok_or(Error::InvalidRemainingLength)?;
71 topics.push((topic_filter, options));
72 }
73 Ok(Subscribe {
74 pid,
75 properties,
76 topics,
77 })
78 }
79}
80
81impl Encodable for Subscribe {
82 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
83 write_u16(writer, self.pid.value())?;
84 self.properties.encode(writer)?;
85 for (topic_filter, options) in &self.topics {
86 write_bytes(writer, topic_filter.as_bytes())?;
87 write_u8(writer, options.to_u8())?;
88 }
89 Ok(())
90 }
91
92 fn encode_len(&self) -> usize {
93 2 + self.properties.encode_len()
94 + self
95 .topics
96 .iter()
97 .map(|(filter, _)| 3 + filter.len())
98 .sum::<usize>()
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
104#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
105pub struct SubscribeProperties {
106 pub subscription_id: Option<VarByteInt>,
107 pub user_properties: Vec<UserProperty>,
108}
109
110impl SubscribeProperties {
111 pub async fn decode_async<T: AsyncRead + Unpin>(
112 reader: &mut T,
113 packet_type: PacketType,
114 ) -> Result<Self, ErrorV5> {
115 let mut properties = SubscribeProperties::default();
116 decode_properties!(packet_type, properties, reader, SubscriptionIdentifier,);
117 Ok(properties)
118 }
119}
120
121impl Encodable for SubscribeProperties {
122 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
123 encode_properties!(self, writer, SubscriptionIdentifier,);
124 Ok(())
125 }
126 fn encode_len(&self) -> usize {
127 let mut len = 0;
128 encode_properties_len!(self, len, SubscriptionIdentifier,);
129 len
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
135#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
136pub struct SubscriptionOptions {
137 pub max_qos: QoS,
138 pub no_local: bool,
139 pub retain_as_published: bool,
140 pub retain_handling: RetainHandling,
141}
142
143impl SubscriptionOptions {
144 pub fn new(max_qos: QoS) -> Self {
145 SubscriptionOptions {
146 max_qos,
147 no_local: false,
148 retain_as_published: true,
149 retain_handling: RetainHandling::SendAtSubscribe,
150 }
151 }
152
153 pub fn to_u8(&self) -> u8 {
154 let mut byte = self.max_qos as u8;
155 if self.no_local {
156 byte |= 0b100;
157 }
158 if self.retain_as_published {
159 byte |= 0b1000;
160 }
161 byte |= (self.retain_handling as u8) << 4;
162 byte
163 }
164}
165
166#[repr(u8)]
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
169#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
170pub enum RetainHandling {
171 SendAtSubscribe = 0,
172 SendAtSubscribeIfNotExist = 1,
173 DoNotSend = 2,
174}
175
176impl RetainHandling {
177 pub fn from_u8(value: u8) -> Option<Self> {
178 let opt = match value {
179 0 => Self::SendAtSubscribe,
180 1 => Self::SendAtSubscribeIfNotExist,
181 2 => Self::DoNotSend,
182 _ => return None,
183 };
184 Some(opt)
185 }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq)]
190#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
191pub struct Suback {
192 pub pid: Pid,
193 pub properties: SubackProperties,
194 pub topics: Vec<SubscribeReasonCode>,
195}
196
197impl Suback {
198 pub fn new(pid: Pid, topics: Vec<SubscribeReasonCode>) -> Self {
199 Suback {
200 pid,
201 properties: SubackProperties::default(),
202 topics,
203 }
204 }
205
206 pub async fn decode_async<T: AsyncRead + Unpin>(
207 reader: &mut T,
208 header: Header,
209 ) -> Result<Self, ErrorV5> {
210 let mut remaining_len = header.remaining_len as usize;
211 let pid = Pid::try_from(read_u16(reader).await?)?;
212 let properties = SubackProperties::decode_async(reader, header.typ).await?;
213 remaining_len = remaining_len
214 .checked_sub(2 + properties.encode_len())
215 .ok_or(Error::InvalidRemainingLength)?;
216 let mut topics = Vec::new();
217 while remaining_len > 0 {
218 let value = read_u8(reader).await?;
219 let code = SubscribeReasonCode::from_u8(value)
220 .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
221 topics.push(code);
222 remaining_len -= 1;
223 }
224 Ok(Suback {
225 pid,
226 properties,
227 topics,
228 })
229 }
230}
231
232impl Encodable for Suback {
233 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
234 write_u16(writer, self.pid.value())?;
235 self.properties.encode(writer)?;
236 for reason_code in &self.topics {
237 write_u8(writer, *reason_code as u8)?;
238 }
239 Ok(())
240 }
241
242 fn encode_len(&self) -> usize {
243 2 + self.properties.encode_len() + self.topics.len()
244 }
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Default)]
249#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
250pub struct SubackProperties {
251 pub reason_string: Option<Arc<String>>,
252 pub user_properties: Vec<UserProperty>,
253}
254
255impl SubackProperties {
256 pub async fn decode_async<T: AsyncRead + Unpin>(
257 reader: &mut T,
258 packet_type: PacketType,
259 ) -> Result<Self, ErrorV5> {
260 let mut properties = SubackProperties::default();
261 decode_properties!(packet_type, properties, reader, ReasonString,);
262 Ok(properties)
263 }
264}
265
266impl Encodable for SubackProperties {
267 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
268 encode_properties!(self, writer, ReasonString,);
269 Ok(())
270 }
271 fn encode_len(&self) -> usize {
272 let mut len = 0;
273 encode_properties_len!(self, len, ReasonString,);
274 len
275 }
276}
277
278#[repr(u8)]
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
297#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
298pub enum SubscribeReasonCode {
299 GrantedQoS0 = 0x00,
300 GrantedQoS1 = 0x01,
301 GrantedQoS2 = 0x02,
302 UnspecifiedError = 0x80,
303 ImplementationSpecificError = 0x83,
304 NotAuthorized = 0x87,
305 TopicFilterInvalid = 0x8F,
306 PacketIdentifierInUse = 0x91,
307 QuotaExceeded = 0x97,
308 SharedSubscriptionNotSupported = 0x9E,
309 SubscriptionIdentifiersNotSupported = 0xA1,
310 WildcardSubscriptionsNotSupported = 0xA2,
311}
312
313impl SubscribeReasonCode {
314 pub fn from_u8(value: u8) -> Option<Self> {
315 let code = match value {
316 0x00 => Self::GrantedQoS0,
317 0x01 => Self::GrantedQoS1,
318 0x02 => Self::GrantedQoS2,
319 0x80 => Self::UnspecifiedError,
320 0x83 => Self::ImplementationSpecificError,
321 0x87 => Self::NotAuthorized,
322 0x8F => Self::TopicFilterInvalid,
323 0x91 => Self::PacketIdentifierInUse,
324 0x97 => Self::QuotaExceeded,
325 0x9E => Self::SharedSubscriptionNotSupported,
326 0xA1 => Self::SubscriptionIdentifiersNotSupported,
327 0xA2 => Self::WildcardSubscriptionsNotSupported,
328 _ => return None,
329 };
330 Some(code)
331 }
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, Hash)]
336#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
337pub struct Unsubscribe {
338 pub pid: Pid,
339 pub properties: UnsubscribeProperties,
340 pub topics: Vec<TopicFilter>,
341}
342
343impl Unsubscribe {
344 pub fn new(pid: Pid, topics: Vec<TopicFilter>) -> Self {
345 Unsubscribe {
346 pid,
347 properties: Default::default(),
348 topics,
349 }
350 }
351
352 pub async fn decode_async<T: AsyncRead + Unpin>(
353 reader: &mut T,
354 header: Header,
355 ) -> Result<Self, ErrorV5> {
356 let mut remaining_len = header.remaining_len as usize;
357 let pid = Pid::try_from(read_u16(reader).await?)?;
358 let (property_len, property_len_bytes) = decode_var_int(reader).await?;
359 let mut properties = UnsubscribeProperties::default();
360 let mut len = 0;
361 while property_len as usize > len {
362 let property_id = PropertyId::from_u8(read_u8(reader).await?)?;
363 match property_id {
364 PropertyId::UserProperty => {
365 let property = PropertyValue::decode_user_property(reader).await?;
366 len += 1 + 4 + property.name.len() + property.value.len();
367 properties.user_properties.push(property);
368 }
369 _ => return Err(ErrorV5::InvalidProperty(header.typ, property_id)),
370 }
371 }
372 if property_len as usize != len {
373 return Err(ErrorV5::InvalidPropertyLength(property_len));
374 }
375 remaining_len = remaining_len
376 .checked_sub(2 + property_len_bytes + len)
377 .ok_or(Error::InvalidRemainingLength)?;
378 if remaining_len == 0 {
379 return Err(Error::EmptySubscription.into());
380 }
381 let mut topics = Vec::new();
382 while remaining_len > 0 {
383 let topic_filter = TopicFilter::try_from(read_string(reader).await?)?;
384 remaining_len = remaining_len
385 .checked_sub(2 + topic_filter.len())
386 .ok_or(Error::InvalidRemainingLength)?;
387 topics.push(topic_filter);
388 }
389 Ok(Unsubscribe {
390 pid,
391 properties,
392 topics,
393 })
394 }
395}
396
397impl Encodable for Unsubscribe {
398 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
399 write_u16(writer, self.pid.value())?;
400 self.properties.encode(writer)?;
401 for topic_filter in &self.topics {
402 write_bytes(writer, topic_filter.as_bytes())?;
403 }
404 Ok(())
405 }
406
407 fn encode_len(&self) -> usize {
408 let mut len = 2;
409 len += self.properties.encode_len();
410 len += self
411 .topics
412 .iter()
413 .map(|topic_filter| 2 + topic_filter.len())
414 .sum::<usize>();
415 len
416 }
417}
418
419#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
421#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
422pub struct UnsubscribeProperties {
423 pub user_properties: Vec<UserProperty>,
424}
425
426impl UnsubscribeProperties {
427 pub async fn decode_async<T: AsyncRead + Unpin>(
428 reader: &mut T,
429 packet_type: PacketType,
430 ) -> Result<Self, ErrorV5> {
431 let mut properties = UnsubscribeProperties::default();
432 decode_properties!(packet_type, properties, reader,);
433 Ok(properties)
434 }
435}
436
437impl Encodable for UnsubscribeProperties {
438 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
439 encode_properties!(self, writer);
440 Ok(())
441 }
442 fn encode_len(&self) -> usize {
443 let mut len = 0;
444 encode_properties_len!(self, len);
445 len
446 }
447}
448
449impl From<Vec<UserProperty>> for UnsubscribeProperties {
450 fn from(user_properties: Vec<UserProperty>) -> UnsubscribeProperties {
451 UnsubscribeProperties { user_properties }
452 }
453}
454
455#[derive(Debug, Clone, PartialEq, Eq)]
457#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
458pub struct Unsuback {
459 pub pid: Pid,
460 pub properties: UnsubackProperties,
461 pub topics: Vec<UnsubscribeReasonCode>,
462}
463
464impl Unsuback {
465 pub fn new(pid: Pid, topics: Vec<UnsubscribeReasonCode>) -> Self {
466 Unsuback {
467 pid,
468 properties: UnsubackProperties::default(),
469 topics,
470 }
471 }
472
473 pub async fn decode_async<T: AsyncRead + Unpin>(
474 reader: &mut T,
475 header: Header,
476 ) -> Result<Self, ErrorV5> {
477 let mut remaining_len = header.remaining_len as usize;
478 let pid = Pid::try_from(read_u16(reader).await?)?;
479 let properties = UnsubackProperties::decode_async(reader, header.typ).await?;
480 remaining_len = remaining_len
481 .checked_sub(2 + properties.encode_len())
482 .ok_or(Error::InvalidRemainingLength)?;
483 let mut topics = Vec::new();
484 while remaining_len > 0 {
485 let value = read_u8(reader).await?;
486 let code = UnsubscribeReasonCode::from_u8(value)
487 .ok_or(ErrorV5::InvalidReasonCode(header.typ, value))?;
488 topics.push(code);
489 remaining_len -= 1;
490 }
491 Ok(Unsuback {
492 pid,
493 properties,
494 topics,
495 })
496 }
497}
498
499impl Encodable for Unsuback {
500 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
501 write_u16(writer, self.pid.value())?;
502 self.properties.encode(writer)?;
503 for reason_code in &self.topics {
504 write_u8(writer, *reason_code as u8)?;
505 }
506 Ok(())
507 }
508
509 fn encode_len(&self) -> usize {
510 2 + self.properties.encode_len() + self.topics.len()
511 }
512}
513
514#[derive(Debug, Clone, PartialEq, Eq, Default)]
516#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
517pub struct UnsubackProperties {
518 pub reason_string: Option<Arc<String>>,
519 pub user_properties: Vec<UserProperty>,
520}
521
522impl UnsubackProperties {
523 pub async fn decode_async<T: AsyncRead + Unpin>(
524 reader: &mut T,
525 packet_type: PacketType,
526 ) -> Result<Self, ErrorV5> {
527 let mut properties = UnsubackProperties::default();
528 decode_properties!(packet_type, properties, reader, ReasonString,);
529 Ok(properties)
530 }
531}
532
533impl Encodable for UnsubackProperties {
534 fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
535 encode_properties!(self, writer, ReasonString,);
536 Ok(())
537 }
538 fn encode_len(&self) -> usize {
539 let mut len = 0;
540 encode_properties_len!(self, len, ReasonString,);
541 len
542 }
543}
544
545#[repr(u8)]
558#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
559#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
560pub enum UnsubscribeReasonCode {
561 Success = 0x00,
562 NoSubscriptionExisted = 0x11,
563 UnspecifiedError = 0x80,
564 ImplementationSpecificError = 0x83,
565 NotAuthorized = 0x87,
566 TopicFilterInvalid = 0x8F,
567 PacketIdentifierInUse = 0x91,
568}
569
570impl UnsubscribeReasonCode {
571 pub fn from_u8(value: u8) -> Option<Self> {
572 let code = match value {
573 0x00 => Self::Success,
574 0x11 => Self::NoSubscriptionExisted,
575 0x80 => Self::UnspecifiedError,
576 0x83 => Self::ImplementationSpecificError,
577 0x87 => Self::NotAuthorized,
578 0x8F => Self::TopicFilterInvalid,
579 0x91 => Self::PacketIdentifierInUse,
580 _ => return None,
581 };
582 Some(code)
583 }
584}