mqtt_proto/v5/
publish.rs

1use std::convert::TryFrom;
2use std::io;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use simdutf8::basic::from_utf8;
7use tokio::io::{AsyncRead, AsyncReadExt};
8
9use super::{
10    decode_properties, encode_properties, encode_properties_len, ErrorV5, Header, PacketType,
11    UserProperty, VarByteInt,
12};
13use crate::{
14    read_string, read_u16, read_u8, write_bytes, write_u16, write_u8, Encodable, Error, Pid, QoS,
15    QosPid, TopicName,
16};
17
18/// Body type of PUBLISH packet.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct Publish {
21    pub dup: bool,
22    pub retain: bool,
23    pub qos_pid: QosPid,
24    pub topic_name: TopicName,
25    pub payload: Bytes,
26    pub properties: PublishProperties,
27}
28
29#[cfg(feature = "arbitrary")]
30impl<'a> arbitrary::Arbitrary<'a> for Publish {
31    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
32        Ok(Publish {
33            dup: u.arbitrary()?,
34            retain: u.arbitrary()?,
35            qos_pid: u.arbitrary()?,
36            topic_name: u.arbitrary()?,
37            properties: u.arbitrary()?,
38            payload: Bytes::from(Vec::<u8>::arbitrary(u)?),
39        })
40    }
41}
42
43impl Publish {
44    pub fn new(qos_pid: QosPid, topic_name: TopicName, payload: Bytes) -> Self {
45        Publish {
46            dup: false,
47            retain: false,
48            qos_pid,
49            topic_name,
50            payload,
51            properties: PublishProperties::default(),
52        }
53    }
54
55    pub async fn decode_async<T: AsyncRead + Unpin>(
56        reader: &mut T,
57        header: Header,
58    ) -> Result<Self, ErrorV5> {
59        let mut remaining_len = header.remaining_len as usize;
60        let topic_name = read_string(reader).await?;
61        remaining_len = remaining_len
62            .checked_sub(2 + topic_name.len())
63            .ok_or(Error::InvalidRemainingLength)?;
64        let qos_pid = match header.qos {
65            QoS::Level0 => QosPid::Level0,
66            QoS::Level1 => {
67                remaining_len = remaining_len
68                    .checked_sub(2)
69                    .ok_or(Error::InvalidRemainingLength)?;
70                QosPid::Level1(Pid::try_from(read_u16(reader).await?)?)
71            }
72            QoS::Level2 => {
73                remaining_len = remaining_len
74                    .checked_sub(2)
75                    .ok_or(Error::InvalidRemainingLength)?;
76                QosPid::Level2(Pid::try_from(read_u16(reader).await?)?)
77            }
78        };
79        let properties = PublishProperties::decode_async(reader, header.typ).await?;
80        remaining_len = remaining_len
81            .checked_sub(properties.encode_len())
82            .ok_or(Error::InvalidRemainingLength)?;
83        let payload = if remaining_len > 0 {
84            let mut data = vec![0u8; remaining_len];
85            reader
86                .read_exact(&mut data)
87                .await
88                .map_err(|err| Error::IoError(err.kind(), err.to_string()))?;
89            if properties.payload_is_utf8 == Some(true) && from_utf8(&data).is_err() {
90                return Err(ErrorV5::InvalidPayloadFormat);
91            }
92            data
93        } else {
94            Vec::new()
95        };
96        Ok(Publish {
97            dup: header.dup,
98            qos_pid,
99            retain: header.retain,
100            topic_name: TopicName::try_from(topic_name)?,
101            properties,
102            payload: Bytes::from(payload),
103        })
104    }
105}
106
107impl Encodable for Publish {
108    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
109        write_bytes(writer, self.topic_name.as_bytes())?;
110        match self.qos_pid {
111            QosPid::Level0 => {}
112            QosPid::Level1(pid) | QosPid::Level2(pid) => {
113                write_u16(writer, pid.value())?;
114            }
115        }
116        self.properties.encode(writer)?;
117        writer.write_all(self.payload.as_ref())?;
118        Ok(())
119    }
120
121    fn encode_len(&self) -> usize {
122        let mut len = 2 + self.topic_name.len();
123        match self.qos_pid {
124            QosPid::Level0 => {}
125            QosPid::Level1(_) | QosPid::Level2(_) => {
126                len += 2;
127            }
128        }
129        len += self.properties.encode_len();
130        len += self.payload.len();
131        len
132    }
133}
134
135/// Property list for PUBLISH packet.
136#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
137pub struct PublishProperties {
138    pub payload_is_utf8: Option<bool>,
139    pub message_expiry_interval: Option<u32>,
140    pub topic_alias: Option<u16>,
141    pub response_topic: Option<TopicName>,
142    pub correlation_data: Option<Bytes>,
143    pub user_properties: Vec<UserProperty>,
144    // FIXME: this is a list of identifiers
145    pub subscription_id: Option<VarByteInt>,
146    pub content_type: Option<Arc<String>>,
147}
148
149#[cfg(feature = "arbitrary")]
150impl<'a> arbitrary::Arbitrary<'a> for PublishProperties {
151    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
152        Ok(PublishProperties {
153            payload_is_utf8: u.arbitrary()?,
154            message_expiry_interval: u.arbitrary()?,
155            topic_alias: u.arbitrary()?,
156            response_topic: u.arbitrary()?,
157            correlation_data: Option::<Vec<u8>>::arbitrary(u)?.map(Bytes::from),
158            user_properties: u.arbitrary()?,
159            subscription_id: u.arbitrary()?,
160            content_type: u.arbitrary()?,
161        })
162    }
163}
164
165impl PublishProperties {
166    pub async fn decode_async<T: AsyncRead + Unpin>(
167        reader: &mut T,
168        packet_type: PacketType,
169    ) -> Result<Self, ErrorV5> {
170        let mut properties = PublishProperties::default();
171        decode_properties!(
172            packet_type,
173            properties,
174            reader,
175            PayloadFormatIndicator,
176            MessageExpiryInterval,
177            TopicAlias,
178            ResponseTopic,
179            CorrelationData,
180            SubscriptionIdentifier,
181            ContentType,
182        );
183        Ok(properties)
184    }
185}
186
187impl Encodable for PublishProperties {
188    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
189        encode_properties!(
190            self,
191            writer,
192            PayloadFormatIndicator,
193            MessageExpiryInterval,
194            TopicAlias,
195            ResponseTopic,
196            CorrelationData,
197            SubscriptionIdentifier,
198            ContentType,
199        );
200        Ok(())
201    }
202    fn encode_len(&self) -> usize {
203        let mut len = 0;
204        encode_properties_len!(
205            self,
206            len,
207            PayloadFormatIndicator,
208            MessageExpiryInterval,
209            TopicAlias,
210            ResponseTopic,
211            CorrelationData,
212            SubscriptionIdentifier,
213            ContentType,
214        );
215        len
216    }
217}
218
219/// Body type for PUBACK packet.
220#[derive(Debug, Clone, PartialEq, Eq)]
221#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
222pub struct Puback {
223    pub pid: Pid,
224    pub reason_code: PubackReasonCode,
225    pub properties: PubackProperties,
226}
227
228impl Puback {
229    pub fn new(pid: Pid, reason_code: PubackReasonCode) -> Self {
230        Puback {
231            pid,
232            reason_code,
233            properties: PubackProperties::default(),
234        }
235    }
236
237    pub fn new_success(pid: Pid) -> Self {
238        Self::new(pid, PubackReasonCode::Success)
239    }
240
241    pub async fn decode_async<T: AsyncRead + Unpin>(
242        reader: &mut T,
243        header: Header,
244    ) -> Result<Self, ErrorV5> {
245        let pid = Pid::try_from(read_u16(reader).await?)?;
246        let (reason_code, properties) = if header.remaining_len == 2 {
247            (PubackReasonCode::Success, PubackProperties::default())
248        } else if header.remaining_len == 3 {
249            let reason_byte = read_u8(reader).await?;
250            let reason_code = PubackReasonCode::from_u8(reason_byte)
251                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
252            (reason_code, PubackProperties::default())
253        } else {
254            let reason_byte = read_u8(reader).await?;
255            let reason_code = PubackReasonCode::from_u8(reason_byte)
256                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
257            let properties = PubackProperties::decode_async(reader, header.typ).await?;
258            (reason_code, properties)
259        };
260        Ok(Puback {
261            pid,
262            reason_code,
263            properties,
264        })
265    }
266}
267
268impl Encodable for Puback {
269    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
270        write_u16(writer, self.pid.value())?;
271        if self.properties == PubackProperties::default() {
272            if self.reason_code != PubackReasonCode::Success {
273                write_u8(writer, self.reason_code as u8)?;
274            }
275        } else {
276            write_u8(writer, self.reason_code as u8)?;
277            self.properties.encode(writer)?;
278        }
279        Ok(())
280    }
281
282    fn encode_len(&self) -> usize {
283        if self.properties == PubackProperties::default() {
284            if self.reason_code == PubackReasonCode::Success {
285                2
286            } else {
287                3
288            }
289        } else {
290            3 + self.properties.encode_len()
291        }
292    }
293}
294
295/// Property list for PUBACK packet.
296#[derive(Debug, Clone, PartialEq, Eq, Default)]
297#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
298pub struct PubackProperties {
299    pub reason_string: Option<Arc<String>>,
300    pub user_properties: Vec<UserProperty>,
301}
302
303impl PubackProperties {
304    pub async fn decode_async<T: AsyncRead + Unpin>(
305        reader: &mut T,
306        packet_type: PacketType,
307    ) -> Result<Self, ErrorV5> {
308        let mut properties = PubackProperties::default();
309        decode_properties!(packet_type, properties, reader, ReasonString,);
310        Ok(properties)
311    }
312}
313
314impl Encodable for PubackProperties {
315    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
316        encode_properties!(self, writer, ReasonString,);
317        Ok(())
318    }
319    fn encode_len(&self) -> usize {
320        let mut len = 0;
321        encode_properties_len!(self, len, ReasonString,);
322        len
323    }
324}
325
326/// Reason code for PUBACK packet.
327///
328/// | Dec |  Hex | Reason Code name              | Description                                                                                                        |
329/// |-----|------|-------------------------------|--------------------------------------------------------------------------------------------------------------------|
330/// |   0 | 0x00 | Success                       | The message is accepted. Publication of the QoS 1 message proceeds.                                                |
331/// |  16 | 0x10 | No matching subscribers       | The message is accepted but there are no subscribers. This is sent only by the Server.                             |
332/// |     |      |                               | If the Server knows that there are no matching subscribers, it MAY use this Reason Code instead of 0x00 (Success). |
333/// | 128 | 0x80 | Unspecified error             | The receiver does not accept the publish but either does not want to reveal the reason,                            |
334/// |     |      |                               | or it does not match one of the other values.                                                                      |
335/// | 131 | 0x83 | Implementation specific error | The PUBLISH is valid but the receiver is not willing to accept it.                                                 |
336/// | 135 | 0x87 | Not authorized                | The PUBLISH is not authorized.                                                                                     |
337/// | 144 | 0x90 | Topic Name invalid            | The Topic Name is not malformed, but is not accepted by this Client or Server.                                     |
338/// | 145 | 0x91 | Packet identifier in use      | The Packet Identifier is already in use.                                                                           |
339/// |     |      |                               | This might indicate a mismatch in the Session State between the Client and Server.                                 |
340/// | 151 | 0x97 | Quota exceeded                | An implementation or administrative imposed limit has been exceeded.                                               |
341/// | 153 | 0x99 | Payload format invalid        | The payload format does not match the specified Payload Format Indicator.                                          |
342#[repr(u8)]
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
344#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
345pub enum PubackReasonCode {
346    Success = 0x00,
347    NoMatchingSubscribers = 0x10,
348    UnspecifiedError = 0x80,
349    ImplementationSpecificError = 0x83,
350    NotAuthorized = 0x87,
351    TopicNameInvalid = 0x90,
352    PacketIdentifierInUse = 0x91,
353    QuotaExceeded = 0x97,
354    PayloadFormatInvalid = 0x99,
355}
356
357impl PubackReasonCode {
358    pub fn from_u8(value: u8) -> Option<Self> {
359        let code = match value {
360            0x00 => Self::Success,
361            0x10 => Self::NoMatchingSubscribers,
362            0x80 => Self::UnspecifiedError,
363            0x83 => Self::ImplementationSpecificError,
364            0x87 => Self::NotAuthorized,
365            0x90 => Self::TopicNameInvalid,
366            0x91 => Self::PacketIdentifierInUse,
367            0x97 => Self::QuotaExceeded,
368            0x99 => Self::PayloadFormatInvalid,
369            _ => return None,
370        };
371        Some(code)
372    }
373}
374
375/// Body type for PUBREC packet.
376#[derive(Debug, Clone, PartialEq, Eq)]
377#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
378pub struct Pubrec {
379    pub pid: Pid,
380    pub reason_code: PubrecReasonCode,
381    pub properties: PubrecProperties,
382}
383
384impl Pubrec {
385    pub fn new(pid: Pid, reason_code: PubrecReasonCode) -> Self {
386        Pubrec {
387            pid,
388            reason_code,
389            properties: PubrecProperties::default(),
390        }
391    }
392
393    pub fn new_success(pid: Pid) -> Self {
394        Self::new(pid, PubrecReasonCode::Success)
395    }
396
397    pub async fn decode_async<T: AsyncRead + Unpin>(
398        reader: &mut T,
399        header: Header,
400    ) -> Result<Self, ErrorV5> {
401        let pid = Pid::try_from(read_u16(reader).await?)?;
402        let (reason_code, properties) = if header.remaining_len == 2 {
403            (PubrecReasonCode::Success, PubrecProperties::default())
404        } else if header.remaining_len == 3 {
405            let reason_byte = read_u8(reader).await?;
406            let reason_code = PubrecReasonCode::from_u8(reason_byte)
407                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
408            (reason_code, PubrecProperties::default())
409        } else {
410            let reason_byte = read_u8(reader).await?;
411            let reason_code = PubrecReasonCode::from_u8(reason_byte)
412                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
413            let properties = PubrecProperties::decode_async(reader, header.typ).await?;
414            (reason_code, properties)
415        };
416        Ok(Pubrec {
417            pid,
418            reason_code,
419            properties,
420        })
421    }
422}
423
424impl Encodable for Pubrec {
425    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
426        write_u16(writer, self.pid.value())?;
427        if self.properties == PubrecProperties::default() {
428            if self.reason_code != PubrecReasonCode::Success {
429                write_u8(writer, self.reason_code as u8)?;
430            }
431        } else {
432            write_u8(writer, self.reason_code as u8)?;
433            self.properties.encode(writer)?;
434        }
435        Ok(())
436    }
437
438    fn encode_len(&self) -> usize {
439        if self.properties == PubrecProperties::default() {
440            if self.reason_code == PubrecReasonCode::Success {
441                2
442            } else {
443                3
444            }
445        } else {
446            3 + self.properties.encode_len()
447        }
448    }
449}
450
451/// Property list for PUBREC packet.
452#[derive(Debug, Clone, PartialEq, Eq, Default)]
453#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
454pub struct PubrecProperties {
455    pub reason_string: Option<Arc<String>>,
456    pub user_properties: Vec<UserProperty>,
457}
458
459impl PubrecProperties {
460    pub async fn decode_async<T: AsyncRead + Unpin>(
461        reader: &mut T,
462        packet_type: PacketType,
463    ) -> Result<Self, ErrorV5> {
464        let mut properties = PubrecProperties::default();
465        decode_properties!(packet_type, properties, reader, ReasonString,);
466        Ok(properties)
467    }
468}
469
470impl Encodable for PubrecProperties {
471    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
472        encode_properties!(self, writer, ReasonString,);
473        Ok(())
474    }
475    fn encode_len(&self) -> usize {
476        let mut len = 0;
477        encode_properties_len!(self, len, ReasonString,);
478        len
479    }
480}
481
482/// Reason code for PUBREC packet.
483///
484/// | Dec |  Hex | Reason Code name              | Description                                                                                                        |
485/// |-----|------|-------------------------------|--------------------------------------------------------------------------------------------------------------------|
486/// |   0 | 0x00 | Success                       | The message is accepted. Publication of the QoS 2 message proceeds.                                                |
487/// |  16 | 0x10 | No matching subscribers       | The message is accepted but there are no subscribers. This is sent only by the Server.                             |
488/// |     |      |                               | If the Server knows that there are no matching subscribers, it MAY use this Reason Code instead of 0x00 (Success). |
489/// | 128 | 0x80 | Unspecified error             | The receiver does not accept the publish but either does not want to reveal the reason,                            |
490/// |     |      |                               | or it does not match one of the other values.                                                                      |
491/// | 131 | 0x83 | Implementation specific error | The PUBLISH is valid but the receiver is not willing to accept it.                                                 |
492/// | 135 | 0x87 | Not authorized                | The PUBLISH is not authorized.                                                                                     |
493/// | 144 | 0x90 | Topic Name invalid            | The Topic Name is not malformed, but is not accepted by this Client or Server.                                     |
494/// | 145 | 0x91 | Packet identifier in use      | The Packet Identifier is already in use.                                                                           |
495/// |     |      |                               | This might indicate a mismatch in the Session State between the Client and Server.                                 |
496/// | 151 | 0x97 | Quota exceeded                | An implementation or administrative imposed limit has been exceeded.                                               |
497/// | 153 | 0x99 | Payload format invalid        | The payload format does not match the specified Payload Format Indicator.                                          |
498#[repr(u8)]
499#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
500#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
501pub enum PubrecReasonCode {
502    Success = 0x00,
503    NoMatchingSubscribers = 0x10,
504    UnspecifiedError = 0x80,
505    ImplementationSpecificError = 0x83,
506    NotAuthorized = 0x87,
507    TopicNameInvalid = 0x90,
508    PacketIdentifierInUse = 0x91,
509    QuotaExceeded = 0x97,
510    PayloadFormatInvalid = 0x99,
511}
512
513impl PubrecReasonCode {
514    pub fn from_u8(value: u8) -> Option<Self> {
515        let code = match value {
516            0x00 => Self::Success,
517            0x10 => Self::NoMatchingSubscribers,
518            0x80 => Self::UnspecifiedError,
519            0x83 => Self::ImplementationSpecificError,
520            0x87 => Self::NotAuthorized,
521            0x90 => Self::TopicNameInvalid,
522            0x91 => Self::PacketIdentifierInUse,
523            0x97 => Self::QuotaExceeded,
524            0x99 => Self::PayloadFormatInvalid,
525            _ => return None,
526        };
527        Some(code)
528    }
529}
530
531/// Body type for PUBREL packet.
532#[derive(Debug, Clone, PartialEq, Eq)]
533#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
534pub struct Pubrel {
535    pub pid: Pid,
536    pub reason_code: PubrelReasonCode,
537    pub properties: PubrelProperties,
538}
539
540impl Pubrel {
541    pub fn new(pid: Pid, reason_code: PubrelReasonCode) -> Self {
542        Pubrel {
543            pid,
544            reason_code,
545            properties: PubrelProperties::default(),
546        }
547    }
548
549    pub fn new_success(pid: Pid) -> Self {
550        Self::new(pid, PubrelReasonCode::Success)
551    }
552
553    pub async fn decode_async<T: AsyncRead + Unpin>(
554        reader: &mut T,
555        header: Header,
556    ) -> Result<Self, ErrorV5> {
557        let pid = Pid::try_from(read_u16(reader).await?)?;
558        let (reason_code, properties) = if header.remaining_len == 2 {
559            (PubrelReasonCode::Success, PubrelProperties::default())
560        } else if header.remaining_len == 3 {
561            let reason_byte = read_u8(reader).await?;
562            let reason_code = PubrelReasonCode::from_u8(reason_byte)
563                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
564            (reason_code, PubrelProperties::default())
565        } else {
566            let reason_byte = read_u8(reader).await?;
567            let reason_code = PubrelReasonCode::from_u8(reason_byte)
568                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
569            let properties = PubrelProperties::decode_async(reader, header.typ).await?;
570            (reason_code, properties)
571        };
572        Ok(Pubrel {
573            pid,
574            reason_code,
575            properties,
576        })
577    }
578}
579
580impl Encodable for Pubrel {
581    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
582        write_u16(writer, self.pid.value())?;
583        if self.properties == PubrelProperties::default() {
584            if self.reason_code != PubrelReasonCode::Success {
585                write_u8(writer, self.reason_code as u8)?;
586            }
587        } else {
588            write_u8(writer, self.reason_code as u8)?;
589            self.properties.encode(writer)?;
590        }
591        Ok(())
592    }
593
594    fn encode_len(&self) -> usize {
595        if self.properties == PubrelProperties::default() {
596            if self.reason_code == PubrelReasonCode::Success {
597                2
598            } else {
599                3
600            }
601        } else {
602            3 + self.properties.encode_len()
603        }
604    }
605}
606
607/// Property list for PUBREL packet.
608#[derive(Debug, Clone, PartialEq, Eq, Default)]
609#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
610pub struct PubrelProperties {
611    pub reason_string: Option<Arc<String>>,
612    pub user_properties: Vec<UserProperty>,
613}
614
615impl PubrelProperties {
616    pub async fn decode_async<T: AsyncRead + Unpin>(
617        reader: &mut T,
618        packet_type: PacketType,
619    ) -> Result<Self, ErrorV5> {
620        let mut properties = PubrelProperties::default();
621        decode_properties!(packet_type, properties, reader, ReasonString,);
622        Ok(properties)
623    }
624}
625
626impl Encodable for PubrelProperties {
627    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
628        encode_properties!(self, writer, ReasonString,);
629        Ok(())
630    }
631    fn encode_len(&self) -> usize {
632        let mut len = 0;
633        encode_properties_len!(self, len, ReasonString,);
634        len
635    }
636}
637
638/// Reason code for PUBREL packet.
639///
640/// | Dec |  Hex | Reason Code name            | Description                                                                                 |
641/// |-----|------|-----------------------------|---------------------------------------------------------------------------------------------|
642/// |   0 | 0x00 | Success                     | Message released.                                                                           |
643/// | 146 | 0x92 | Packet Identifier not found | The Packet Identifier is not known. This is not an error during recovery,                   |
644/// |     |      |                             | but at other times indicates a mismatch between the Session State on the Client and Server. |
645#[repr(u8)]
646#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
647#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
648pub enum PubrelReasonCode {
649    Success = 0x00,
650    PacketIdentifierNotFound = 0x92,
651}
652
653impl PubrelReasonCode {
654    pub fn from_u8(value: u8) -> Option<Self> {
655        let code = match value {
656            0x00 => Self::Success,
657            0x92 => Self::PacketIdentifierNotFound,
658            _ => return None,
659        };
660        Some(code)
661    }
662}
663
664/// Body type for PUBCOMP packet.
665#[derive(Debug, Clone, PartialEq, Eq)]
666#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
667pub struct Pubcomp {
668    pub pid: Pid,
669    pub reason_code: PubcompReasonCode,
670    pub properties: PubcompProperties,
671}
672
673impl Pubcomp {
674    pub fn new(pid: Pid, reason_code: PubcompReasonCode) -> Self {
675        Pubcomp {
676            pid,
677            reason_code,
678            properties: PubcompProperties::default(),
679        }
680    }
681
682    pub fn new_success(pid: Pid) -> Self {
683        Self::new(pid, PubcompReasonCode::Success)
684    }
685
686    pub async fn decode_async<T: AsyncRead + Unpin>(
687        reader: &mut T,
688        header: Header,
689    ) -> Result<Self, ErrorV5> {
690        let pid = Pid::try_from(read_u16(reader).await?)?;
691        let (reason_code, properties) = if header.remaining_len == 2 {
692            (PubcompReasonCode::Success, PubcompProperties::default())
693        } else if header.remaining_len == 3 {
694            let reason_byte = read_u8(reader).await?;
695            let reason_code = PubcompReasonCode::from_u8(reason_byte)
696                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
697            (reason_code, PubcompProperties::default())
698        } else {
699            let reason_byte = read_u8(reader).await?;
700            let reason_code = PubcompReasonCode::from_u8(reason_byte)
701                .ok_or(ErrorV5::InvalidReasonCode(header.typ, reason_byte))?;
702            let properties = PubcompProperties::decode_async(reader, header.typ).await?;
703            (reason_code, properties)
704        };
705        Ok(Pubcomp {
706            pid,
707            reason_code,
708            properties,
709        })
710    }
711}
712
713impl Encodable for Pubcomp {
714    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
715        write_u16(writer, self.pid.value())?;
716        if self.properties == PubcompProperties::default() {
717            if self.reason_code != PubcompReasonCode::Success {
718                write_u8(writer, self.reason_code as u8)?;
719            }
720        } else {
721            write_u8(writer, self.reason_code as u8)?;
722            self.properties.encode(writer)?;
723        }
724        Ok(())
725    }
726
727    fn encode_len(&self) -> usize {
728        if self.properties == PubcompProperties::default() {
729            if self.reason_code == PubcompReasonCode::Success {
730                2
731            } else {
732                3
733            }
734        } else {
735            3 + self.properties.encode_len()
736        }
737    }
738}
739
740/// Property list for PUBCOMP packet.
741#[derive(Debug, Clone, PartialEq, Eq, Default)]
742#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
743pub struct PubcompProperties {
744    pub reason_string: Option<Arc<String>>,
745    pub user_properties: Vec<UserProperty>,
746}
747
748impl PubcompProperties {
749    pub async fn decode_async<T: AsyncRead + Unpin>(
750        reader: &mut T,
751        packet_type: PacketType,
752    ) -> Result<Self, ErrorV5> {
753        let mut properties = PubcompProperties::default();
754        decode_properties!(packet_type, properties, reader, ReasonString,);
755        Ok(properties)
756    }
757}
758
759impl Encodable for PubcompProperties {
760    fn encode<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
761        encode_properties!(self, writer, ReasonString,);
762        Ok(())
763    }
764    fn encode_len(&self) -> usize {
765        let mut len = 0;
766        encode_properties_len!(self, len, ReasonString,);
767        len
768    }
769}
770
771/// Reason code for PUBCOMP packet.
772///
773/// | Dec |  Hex | Reason Code name            | Description                                                                                 |
774/// |-----|------|-----------------------------|---------------------------------------------------------------------------------------------|
775/// |   0 | 0x00 | Success                     | Packet Identifier released. Publication of QoS 2 message is complete.                       |
776/// | 146 | 0x92 | Packet Identifier not found | The Packet Identifier is not known. This is not an error during recovery,                   |
777/// |     |      |                             | but at other times indicates a mismatch between the Session State on the Client and Server. |
778#[repr(u8)]
779#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
780#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
781pub enum PubcompReasonCode {
782    Success = 0x00,
783    PacketIdentifierNotFound = 0x92,
784}
785
786impl PubcompReasonCode {
787    pub fn from_u8(value: u8) -> Option<Self> {
788        let code = match value {
789            0x00 => Self::Success,
790            0x92 => Self::PacketIdentifierNotFound,
791            _ => return None,
792        };
793        Some(code)
794    }
795}