ntex_mqtt/v5/codec/packet/
pubacks.rs

1use std::num::NonZeroU16;
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use super::ack_props;
6use crate::error::{DecodeError, EncodeError};
7use crate::utils::{Decode, Encode};
8use crate::v5::codec::{encode::*, UserProperties};
9
10const HEADER_LEN: u32 = 2 + 1; // packet id + reason code
11
12/// PUBACK/PUBREC message content
13#[derive(Debug, PartialEq, Eq, Clone)]
14pub struct PublishAck {
15    /// Packet Identifier
16    pub packet_id: NonZeroU16,
17    pub reason_code: PublishAckReason,
18    pub properties: UserProperties,
19    pub reason_string: Option<ByteString>,
20}
21
22/// PUBREL/PUBCOMP message content
23#[derive(Debug, PartialEq, Eq, Clone)]
24pub struct PublishAck2 {
25    /// Packet Identifier
26    pub packet_id: NonZeroU16,
27    pub reason_code: PublishAck2Reason,
28    pub properties: UserProperties,
29    pub reason_string: Option<ByteString>,
30}
31
32prim_enum! {
33    /// PUBACK / PUBREC reason codes
34    pub enum PublishAckReason {
35        Success = 0,
36        NoMatchingSubscribers = 16,
37        UnspecifiedError = 128,
38        ImplementationSpecificError = 131,
39        NotAuthorized = 135,
40        TopicNameInvalid = 144,
41        PacketIdentifierInUse = 145,
42        QuotaExceeded = 151,
43        PayloadFormatInvalid = 153
44    }
45}
46
47prim_enum! {
48    /// PUBREL / PUBCOMP reason codes
49    pub enum PublishAck2Reason {
50        Success = 0,
51        PacketIdNotFound = 146
52    }
53}
54
55impl PublishAck {
56    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
57        let packet_id = NonZeroU16::decode(src)?;
58
59        let ack = if src.has_remaining() {
60            let reason_code = src.get_u8().try_into()?;
61            if src.has_remaining() {
62                let (properties, reason_string) = ack_props::decode(src)?;
63                ensure!(!src.has_remaining(), DecodeError::InvalidLength); // no data should be left in src
64                Self { packet_id, reason_code, properties, reason_string }
65            } else {
66                Self { packet_id, reason_code, ..Default::default() }
67            }
68        } else {
69            Self { packet_id, ..Default::default() }
70        };
71
72        Ok(ack)
73    }
74}
75
76impl Default for PublishAck {
77    fn default() -> Self {
78        Self {
79            packet_id: NonZeroU16::new(1).unwrap(),
80            reason_code: PublishAckReason::Success,
81            properties: UserProperties::default(),
82            reason_string: None,
83        }
84    }
85}
86
87impl PublishAck2 {
88    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
89        let packet_id = NonZeroU16::decode(src)?;
90        let ack = if src.has_remaining() {
91            let reason_code = src.get_u8().try_into()?;
92            if src.has_remaining() {
93                let (properties, reason_string) = ack_props::decode(src)?;
94                ensure!(!src.has_remaining(), DecodeError::InvalidLength); // no data should be left in src
95                Self { packet_id, reason_code, properties, reason_string }
96            } else {
97                Self { packet_id, reason_code, ..Default::default() }
98            }
99        } else {
100            Self { packet_id, ..Default::default() }
101        };
102
103        Ok(ack)
104    }
105}
106
107impl Default for PublishAck2 {
108    fn default() -> Self {
109        Self {
110            packet_id: NonZeroU16::new(1).unwrap(),
111            reason_code: PublishAck2Reason::Success,
112            properties: UserProperties::default(),
113            reason_string: None,
114        }
115    }
116}
117
118impl EncodeLtd for PublishAck {
119    fn encoded_size(&self, limit: u32) -> usize {
120        let prop_len = ack_props::encoded_size(
121            &self.properties,
122            &self.reason_string,
123            limit - HEADER_LEN - 4,
124        ); // limit - HEADER_LEN - len(packet_len.max())
125        HEADER_LEN as usize + prop_len
126    }
127
128    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
129        self.packet_id.get().encode(buf)?;
130        buf.put_u8(self.reason_code.into());
131        ack_props::encode(&self.properties, &self.reason_string, buf, size - HEADER_LEN)?;
132        Ok(())
133    }
134}
135
136impl EncodeLtd for PublishAck2 {
137    fn encoded_size(&self, limit: u32) -> usize {
138        const HEADER_LEN: u32 = 2 + 1; // fixed header + packet id + reason code
139        let prop_len = ack_props::encoded_size(
140            &self.properties,
141            &self.reason_string,
142            limit - HEADER_LEN - 4,
143        ); // limit - HEADER_LEN - prop_len.max()
144        HEADER_LEN as usize + prop_len
145    }
146
147    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
148        self.packet_id.get().encode(buf)?;
149        buf.put_u8(self.reason_code.into());
150        ack_props::encode(&self.properties, &self.reason_string, buf, size - 3)?;
151        Ok(())
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use test_case::test_case;
159
160    #[test_case(b"\xFF\xFF\x00\x00", 65535, PublishAckReason::Success, vec![], None; "success_empty")]
161    #[test_case(b"\x00\x01", 1, PublishAckReason::Success, vec![], None; "success_no_reason")]
162    #[test_case(b"\x01\x01\x00", 257, PublishAckReason::Success, vec![], None; "success_no_prop_len")]
163    #[test_case(b"\x00\x01\x87", 1, PublishAckReason::NotAuthorized, vec![], None; "no_success_no_prop_len")]
164    #[test_case(b"\x00\x01\x83\x00", 1, PublishAckReason::ImplementationSpecificError, vec![], None; "no_success_min")]
165    #[test_case(b"\x00\xFF\x80\x0D\x26\x00\x01a\x00\x01b\x1F\x00\x03123", 255, PublishAckReason::UnspecifiedError, vec![("a", "b")], Some("123"); "all_out")]
166    fn puback_decode_success(
167        input: &'static [u8],
168        packet_id: u16,
169        reason_code: PublishAckReason,
170        properties: Vec<(&'static str, &'static str)>,
171        reason_string: Option<&'static str>,
172    ) {
173        let mut input = input.into();
174        let result = PublishAck::decode(&mut input);
175        assert_eq!(
176            result,
177            Ok(PublishAck {
178                packet_id: packet_id.try_into().unwrap(),
179                reason_code,
180                properties: properties.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
181                reason_string: reason_string.map(|s| s.into())
182            })
183        );
184        assert_eq!(input.len(), 0);
185    }
186
187    #[test_case(b"\x00\x00", DecodeError::MalformedPacket; "packet_id_zero")]
188    #[test_case(b"\x00\x01\x00\x01", DecodeError::InvalidLength; "properties_promised")]
189    fn puback_decode_must_fail(input: &'static [u8], error: DecodeError) {
190        let mut input = input.into();
191        let result = PublishAck::decode(&mut input);
192        assert_eq!(result, Err(error));
193    }
194}