rmqtt_codec/v5/packet/
pubacks.rs

1use std::num::NonZeroU16;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use super::ack_props;
8use crate::error::{DecodeError, EncodeError};
9use crate::utils::{Decode, Encode};
10use crate::v5::{encode::*, DisconnectReasonCode, ToReasonCode, UserProperties};
11
12const HEADER_LEN: u32 = 2 + 1; // packet id + reason code
13
14/// PUBACK/PUBREC message content
15#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
16pub struct PublishAck {
17    /// Packet Identifier
18    pub packet_id: NonZeroU16,
19    pub reason_code: PublishAckReason,
20    pub properties: UserProperties,
21    pub reason_string: Option<ByteString>,
22}
23
24/// PUBREL/PUBCOMP message content
25#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
26pub struct PublishAck2 {
27    /// Packet Identifier
28    pub packet_id: NonZeroU16,
29    pub reason_code: PublishAck2Reason,
30    pub properties: UserProperties,
31    pub reason_string: Option<ByteString>,
32}
33
34prim_enum! {
35    /// PUBACK / PUBREC reason codes
36    #[derive(Deserialize, Serialize)]
37    pub enum PublishAckReason {
38        Success = 0,
39        NoMatchingSubscribers = 16,
40        UnspecifiedError = 128,
41        ImplementationSpecificError = 131,
42        NotAuthorized = 135,
43        TopicNameInvalid = 144,
44        PacketIdentifierInUse = 145,
45        QuotaExceeded = 151,
46        PayloadFormatInvalid = 153
47    }
48}
49
50impl PublishAckReason {
51    #[inline]
52    pub fn should_disconnect(&self) -> bool {
53        matches!(
54            self,
55            PublishAckReason::UnspecifiedError
56                | PublishAckReason::ImplementationSpecificError
57                | PublishAckReason::NotAuthorized
58                | PublishAckReason::TopicNameInvalid
59        )
60    }
61}
62
63impl ToReasonCode for PublishAckReason {
64    fn to_reason_code(&self) -> DisconnectReasonCode {
65        match self {
66            PublishAckReason::Success | PublishAckReason::NoMatchingSubscribers => {
67                DisconnectReasonCode::NormalDisconnection
68            }
69            PublishAckReason::UnspecifiedError => DisconnectReasonCode::UnspecifiedError,
70            PublishAckReason::ImplementationSpecificError => {
71                DisconnectReasonCode::ImplementationSpecificError
72            }
73            PublishAckReason::NotAuthorized => DisconnectReasonCode::NotAuthorized,
74            PublishAckReason::TopicNameInvalid => DisconnectReasonCode::TopicNameInvalid,
75            PublishAckReason::PacketIdentifierInUse => DisconnectReasonCode::ProtocolError,
76            PublishAckReason::QuotaExceeded => DisconnectReasonCode::QuotaExceeded,
77            PublishAckReason::PayloadFormatInvalid => DisconnectReasonCode::PayloadFormatInvalid,
78        }
79    }
80}
81
82impl From<PublishAckReason> for u8 {
83    fn from(v: PublishAckReason) -> Self {
84        match v {
85            PublishAckReason::Success => 0,
86            PublishAckReason::NoMatchingSubscribers => 16,
87            PublishAckReason::UnspecifiedError => 128,
88            PublishAckReason::ImplementationSpecificError => 131,
89            PublishAckReason::NotAuthorized => 135,
90            PublishAckReason::TopicNameInvalid => 144,
91            PublishAckReason::PacketIdentifierInUse => 145,
92            PublishAckReason::QuotaExceeded => 151,
93            PublishAckReason::PayloadFormatInvalid => 153,
94        }
95    }
96}
97
98prim_enum! {
99    /// PUBREL / PUBCOMP reason codes
100    #[derive(Deserialize, Serialize)]
101    pub enum PublishAck2Reason {
102        Success = 0,
103        PacketIdNotFound = 146
104    }
105}
106
107impl PublishAck2Reason {
108    #[inline]
109    pub fn should_disconnect(&self) -> bool {
110        matches!(self, PublishAck2Reason::PacketIdNotFound)
111    }
112}
113
114impl ToReasonCode for PublishAck2Reason {
115    fn to_reason_code(&self) -> DisconnectReasonCode {
116        match self {
117            PublishAck2Reason::Success => DisconnectReasonCode::NormalDisconnection,
118            PublishAck2Reason::PacketIdNotFound => DisconnectReasonCode::ImplementationSpecificError,
119        }
120    }
121}
122
123impl From<PublishAck2Reason> for u8 {
124    fn from(v: PublishAck2Reason) -> Self {
125        match v {
126            PublishAck2Reason::Success => 0,
127            PublishAck2Reason::PacketIdNotFound => 146,
128        }
129    }
130}
131
132impl PublishAck {
133    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
134        let packet_id = NonZeroU16::decode(src)?;
135
136        let ack = if src.has_remaining() {
137            let reason_code = src.get_u8().try_into()?;
138            if src.has_remaining() {
139                let (properties, reason_string) = ack_props::decode(src)?;
140                ensure!(!src.has_remaining(), DecodeError::InvalidLength); // no data should be left in src
141                Self { packet_id, reason_code, properties, reason_string }
142            } else {
143                Self { packet_id, reason_code, ..Default::default() }
144            }
145        } else {
146            Self { packet_id, ..Default::default() }
147        };
148
149        Ok(ack)
150    }
151}
152
153impl Default for PublishAck {
154    fn default() -> Self {
155        Self {
156            packet_id: NonZeroU16::new(1).unwrap(),
157            reason_code: PublishAckReason::Success,
158            properties: UserProperties::default(),
159            reason_string: None,
160        }
161    }
162}
163
164impl PublishAck2 {
165    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
166        let packet_id = NonZeroU16::decode(src)?;
167        let ack = if src.has_remaining() {
168            let reason_code = src.get_u8().try_into()?;
169            if src.has_remaining() {
170                let (properties, reason_string) = ack_props::decode(src)?;
171                ensure!(!src.has_remaining(), DecodeError::InvalidLength); // no data should be left in src
172                Self { packet_id, reason_code, properties, reason_string }
173            } else {
174                Self { packet_id, reason_code, ..Default::default() }
175            }
176        } else {
177            Self { packet_id, ..Default::default() }
178        };
179
180        Ok(ack)
181    }
182}
183
184impl Default for PublishAck2 {
185    fn default() -> Self {
186        Self {
187            packet_id: NonZeroU16::new(1).unwrap(),
188            reason_code: PublishAck2Reason::Success,
189            properties: UserProperties::default(),
190            reason_string: None,
191        }
192    }
193}
194
195impl EncodeLtd for PublishAck {
196    fn encoded_size(&self, limit: u32) -> usize {
197        let prop_len = ack_props::encoded_size(&self.properties, &self.reason_string, limit - HEADER_LEN - 4); // limit - HEADER_LEN - len(packet_len.max())
198        HEADER_LEN as usize + prop_len
199    }
200
201    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
202        self.packet_id.get().encode(buf)?;
203        buf.put_u8(self.reason_code.into());
204        ack_props::encode(&self.properties, &self.reason_string, buf, size - HEADER_LEN)?;
205        Ok(())
206    }
207}
208
209impl EncodeLtd for PublishAck2 {
210    fn encoded_size(&self, limit: u32) -> usize {
211        const HEADER_LEN: u32 = 2 + 1; // fixed header + packet id + reason code
212        let prop_len = ack_props::encoded_size(&self.properties, &self.reason_string, limit - HEADER_LEN - 4); // limit - HEADER_LEN - prop_len.max()
213        HEADER_LEN as usize + prop_len
214    }
215
216    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
217        self.packet_id.get().encode(buf)?;
218        buf.put_u8(self.reason_code.into());
219        ack_props::encode(&self.properties, &self.reason_string, buf, size - 3)?;
220        Ok(())
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use test_case::test_case;
228
229    #[test_case(b"\xFF\xFF\x00\x00", 65535, PublishAckReason::Success, vec![], None; "success_empty")]
230    #[test_case(b"\x00\x01", 1, PublishAckReason::Success, vec![], None; "success_no_reason")]
231    #[test_case(b"\x01\x01\x00", 257, PublishAckReason::Success, vec![], None; "success_no_prop_len")]
232    #[test_case(b"\x00\x01\x87", 1, PublishAckReason::NotAuthorized, vec![], None; "no_success_no_prop_len")]
233    #[test_case(b"\x00\x01\x83\x00", 1, PublishAckReason::ImplementationSpecificError, vec![], None; "no_success_min")]
234    #[test_case(b"\x00\xFF\x80\x0D\x26\x00\x01a\x00\x01b\x1F\x00\x03123", 255, PublishAckReason::UnspecifiedError, vec![("a", "b")], Some("123"); "all_out")]
235    fn puback_decode_success(
236        input: &'static [u8],
237        packet_id: u16,
238        reason_code: PublishAckReason,
239        properties: Vec<(&'static str, &'static str)>,
240        reason_string: Option<&'static str>,
241    ) {
242        let mut input = input.into();
243        let result = PublishAck::decode(&mut input);
244        assert_eq!(
245            result.unwrap(),
246            PublishAck {
247                packet_id: packet_id.try_into().unwrap(),
248                reason_code,
249                properties: properties.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
250                reason_string: reason_string.map(|s| s.into())
251            }
252        );
253        assert_eq!(input.len(), 0);
254    }
255
256    #[test_case(b"\x00\x00", DecodeError::MalformedPacket; "packet_id_zero")]
257    #[test_case(b"\x00\x01\x00\x01", DecodeError::InvalidLength; "properties_promised")]
258    fn puback_decode_must_fail(input: &'static [u8], error: DecodeError) {
259        let mut input = input.into();
260        let result = PublishAck::decode(&mut input);
261        assert_eq!(result.map_err(|e| e.to_string()), Err(error.to_string()));
262    }
263}