Skip to main content

rmqtt_codec/v5/packet/
connack.rs

1use std::num::NonZeroU16;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use crate::error::{DecodeError, EncodeError};
8use crate::types::{ConnectAckFlags, QoS};
9use crate::utils::{self, Decode, Encode, Property};
10use crate::v5::RECEIVE_MAX_DEFAULT;
11use crate::v5::{encode::*, property_type as pt, UserProperties, UserProperty};
12
13/// Connect acknowledgment packet
14#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
15pub struct ConnectAck {
16    /// enables a Client to establish whether the Client and Server have a consistent view
17    /// about whether there is already stored Session state.
18    pub session_present: bool,
19    pub reason_code: ConnectAckReason,
20
21    pub session_expiry_interval_secs: Option<u32>,
22    pub receive_max: NonZeroU16,
23    pub max_qos: QoS,
24    pub max_packet_size: Option<u32>,
25    pub assigned_client_id: Option<ByteString>,
26    pub topic_alias_max: u16,
27    pub retain_available: bool,
28    pub wildcard_subscription_available: bool,
29    pub subscription_identifiers_available: bool,
30    pub shared_subscription_available: bool,
31    pub server_keepalive_sec: Option<u16>,
32    pub response_info: Option<ByteString>,
33    pub server_reference: Option<ByteString>,
34    pub auth_method: Option<ByteString>,
35    pub auth_data: Option<Bytes>,
36    pub reason_string: Option<ByteString>,
37    pub user_properties: UserProperties,
38}
39
40impl Default for ConnectAck {
41    fn default() -> ConnectAck {
42        ConnectAck {
43            session_present: false,
44            reason_code: ConnectAckReason::Success,
45            session_expiry_interval_secs: None,
46            receive_max: RECEIVE_MAX_DEFAULT,
47            max_qos: QoS::ExactlyOnce,
48            max_packet_size: None,
49            assigned_client_id: None,
50            topic_alias_max: 0,
51            retain_available: true,
52            wildcard_subscription_available: true,
53            subscription_identifiers_available: true,
54            shared_subscription_available: true,
55            server_keepalive_sec: None,
56            response_info: None,
57            server_reference: None,
58            auth_method: None,
59            auth_data: None,
60            reason_string: None,
61            user_properties: Vec::new(),
62        }
63    }
64}
65
66prim_enum! {
67    /// CONNACK reason codes
68    #[derive(Deserialize, Serialize)]
69    pub enum ConnectAckReason {
70        Success = 0,
71        UnspecifiedError = 128,
72        MalformedPacket = 129,
73        ProtocolError = 130,
74        ImplementationSpecificError = 131,
75        UnsupportedProtocolVersion = 132,
76        ClientIdentifierNotValid = 133,
77        BadUserNameOrPassword = 134,
78        NotAuthorized = 135,
79        ServerUnavailable = 136,
80        ServerBusy = 137,
81        Banned = 138,
82        BadAuthenticationMethod = 140,
83        TopicNameInvalid = 144,
84        PacketTooLarge = 149,
85        QuotaExceeded = 151,
86        PayloadFormatInvalid = 153,
87        RetainNotSupported = 154,
88        QosNotSupported = 155,
89        UseAnotherServer = 156,
90        ServerMoved = 157,
91        ConnectionRateExceeded = 159
92    }
93}
94
95impl From<ConnectAckReason> for u8 {
96    fn from(v: ConnectAckReason) -> Self {
97        match v {
98            ConnectAckReason::Success => 0,
99            ConnectAckReason::UnspecifiedError => 128,
100            ConnectAckReason::MalformedPacket => 129,
101            ConnectAckReason::ProtocolError => 130,
102            ConnectAckReason::ImplementationSpecificError => 131,
103            ConnectAckReason::UnsupportedProtocolVersion => 132,
104            ConnectAckReason::ClientIdentifierNotValid => 133,
105            ConnectAckReason::BadUserNameOrPassword => 134,
106            ConnectAckReason::NotAuthorized => 135,
107            ConnectAckReason::ServerUnavailable => 136,
108            ConnectAckReason::ServerBusy => 137,
109            ConnectAckReason::Banned => 138,
110            ConnectAckReason::BadAuthenticationMethod => 140,
111            ConnectAckReason::TopicNameInvalid => 144,
112            ConnectAckReason::PacketTooLarge => 149,
113            ConnectAckReason::QuotaExceeded => 151,
114            ConnectAckReason::PayloadFormatInvalid => 153,
115            ConnectAckReason::RetainNotSupported => 154,
116            ConnectAckReason::QosNotSupported => 155,
117            ConnectAckReason::UseAnotherServer => 156,
118            ConnectAckReason::ServerMoved => 157,
119            ConnectAckReason::ConnectionRateExceeded => 159,
120        }
121    }
122}
123
124impl ConnectAckReason {
125    pub fn reason(self) -> &'static str {
126        match self {
127            ConnectAckReason::Success => "Connection Accepted",
128            ConnectAckReason::UnsupportedProtocolVersion => "protocol version is not supported",
129            ConnectAckReason::ClientIdentifierNotValid => "client identifier is invalid",
130            ConnectAckReason::ServerUnavailable => "Server unavailable",
131            ConnectAckReason::BadUserNameOrPassword => "bad user name or password",
132            ConnectAckReason::NotAuthorized => "not authorized",
133            _ => "Connection Refused",
134        }
135    }
136}
137
138impl ConnectAck {
139    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
140        ensure!(src.remaining() >= 2, DecodeError::InvalidLength);
141        let flags = ConnectAckFlags::from_bits(src.get_u8()).ok_or(DecodeError::ConnAckReservedFlagSet)?;
142
143        let reason_code = src.get_u8().try_into()?;
144
145        let prop_src = &mut utils::take_properties(src)?;
146
147        let mut session_expiry_interval_secs = None;
148        let mut receive_max = None;
149        let mut max_qos = None;
150        let mut retain_available = None;
151        let mut max_packet_size = None;
152        let mut assigned_client_id = None;
153        let mut topic_alias_max = None;
154        let mut reason_string = None;
155        let mut user_properties = Vec::new();
156        let mut wildcard_sub_avail = None;
157        let mut sub_ids_avail = None;
158        let mut shared_sub_avail = None;
159        let mut server_ka_sec = None;
160        let mut response_info = None;
161        let mut server_reference = None;
162        let mut auth_method = None;
163        let mut auth_data = None;
164        while prop_src.has_remaining() {
165            match prop_src.get_u8() {
166                pt::SESS_EXPIRY_INT => session_expiry_interval_secs.read_value(prop_src)?,
167                pt::RECEIVE_MAX => receive_max.read_value(prop_src)?,
168                pt::MAX_QOS => {
169                    ensure!(max_qos.is_none(), DecodeError::MalformedPacket); // property is set twice while not allowed
170                    ensure!(prop_src.has_remaining(), DecodeError::InvalidLength);
171                    max_qos = Some(prop_src.get_u8().try_into()?);
172                }
173                pt::RETAIN_AVAIL => retain_available.read_value(prop_src)?,
174                pt::MAX_PACKET_SIZE => max_packet_size.read_value(prop_src)?,
175                pt::ASSND_CLIENT_ID => assigned_client_id.read_value(prop_src)?,
176                pt::TOPIC_ALIAS_MAX => topic_alias_max.read_value(prop_src)?,
177                pt::REASON_STRING => reason_string.read_value(prop_src)?,
178                pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
179                pt::WILDCARD_SUB_AVAIL => wildcard_sub_avail.read_value(prop_src)?,
180                pt::SUB_IDS_AVAIL => sub_ids_avail.read_value(prop_src)?,
181                pt::SHARED_SUB_AVAIL => shared_sub_avail.read_value(prop_src)?,
182                pt::SERVER_KA => server_ka_sec.read_value(prop_src)?,
183                pt::RESP_INFO => response_info.read_value(prop_src)?,
184                pt::SERVER_REF => server_reference.read_value(prop_src)?,
185                pt::AUTH_METHOD => auth_method.read_value(prop_src)?,
186                pt::AUTH_DATA => auth_data.read_value(prop_src)?,
187                _ => return Err(DecodeError::MalformedPacket),
188            }
189        }
190        ensure!(!src.has_remaining(), DecodeError::InvalidLength);
191
192        Ok(ConnectAck {
193            session_present: flags.contains(ConnectAckFlags::SESSION_PRESENT),
194            reason_code,
195            session_expiry_interval_secs,
196            receive_max: receive_max.unwrap_or(RECEIVE_MAX_DEFAULT),
197            max_qos: max_qos.unwrap_or(QoS::ExactlyOnce),
198            max_packet_size,
199            assigned_client_id,
200            topic_alias_max: topic_alias_max.unwrap_or(0u16),
201            retain_available: retain_available.unwrap_or(true),
202            wildcard_subscription_available: wildcard_sub_avail.unwrap_or(true),
203            subscription_identifiers_available: sub_ids_avail.unwrap_or(true),
204            shared_subscription_available: shared_sub_avail.unwrap_or(true),
205            server_keepalive_sec: server_ka_sec,
206            response_info,
207            server_reference,
208            auth_method,
209            auth_data,
210            reason_string,
211            user_properties,
212        })
213    }
214}
215
216impl EncodeLtd for ConnectAck {
217    fn encoded_size(&self, limit: u32) -> usize {
218        const HEADER_LEN: usize = 2; // state flags byte + reason code
219
220        let mut prop_len = encoded_property_size(&self.session_expiry_interval_secs)
221            + encoded_property_size_default(&self.receive_max, RECEIVE_MAX_DEFAULT)
222            + if self.max_qos < QoS::ExactlyOnce { 1 + 1 } else { 0 }
223            + encoded_property_size(&self.max_packet_size)
224            + encoded_property_size(&self.assigned_client_id)
225            + encoded_property_size_default(&self.retain_available, true)
226            + encoded_property_size_default(&self.wildcard_subscription_available, true)
227            + encoded_property_size_default(&self.subscription_identifiers_available, true)
228            + encoded_property_size_default(&self.shared_subscription_available, true)
229            + encoded_property_size(&self.server_keepalive_sec)
230            + encoded_property_size(&self.response_info)
231            + encoded_property_size(&self.server_reference)
232            + encoded_property_size(&self.auth_method)
233            + encoded_property_size(&self.auth_data);
234        if self.topic_alias_max > 0 {
235            prop_len += 1 + self.topic_alias_max.encoded_size(); // [property type, value..]
236        }
237
238        let diag_len = encoded_size_opt_props(
239            &self.user_properties,
240            &self.reason_string,
241            reduce_limit(limit, HEADER_LEN + 4 + prop_len),
242        ); // exclude other props and max of 4 bytes for property length value
243        prop_len += diag_len;
244        HEADER_LEN + var_int_len(prop_len) as usize + prop_len
245    }
246
247    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
248        let start_len = buf.len();
249
250        buf.put_slice(&[u8::from(self.session_present), self.reason_code.into()]);
251
252        let prop_len = var_int_len_from_size(size - 2);
253        utils::write_variable_length(prop_len, buf);
254
255        encode_property(&self.session_expiry_interval_secs, pt::SESS_EXPIRY_INT, buf)?;
256        encode_property_default(&self.receive_max, RECEIVE_MAX_DEFAULT, pt::RECEIVE_MAX, buf)?;
257        if self.max_qos < QoS::ExactlyOnce {
258            buf.put_slice(&[pt::MAX_QOS, self.max_qos.into()]);
259        }
260        encode_property_default(&self.retain_available, true, pt::RETAIN_AVAIL, buf)?;
261        encode_property(&self.max_packet_size, pt::MAX_PACKET_SIZE, buf)?;
262        encode_property(&self.assigned_client_id, pt::ASSND_CLIENT_ID, buf)?;
263        encode_property_default(&self.topic_alias_max, 0, pt::TOPIC_ALIAS_MAX, buf)?;
264        encode_property_default(&self.wildcard_subscription_available, true, pt::WILDCARD_SUB_AVAIL, buf)?;
265        encode_property_default(&self.subscription_identifiers_available, true, pt::SUB_IDS_AVAIL, buf)?;
266        encode_property_default(&self.shared_subscription_available, true, pt::SHARED_SUB_AVAIL, buf)?;
267        encode_property(&self.server_keepalive_sec, pt::SERVER_KA, buf)?;
268        encode_property(&self.response_info, pt::RESP_INFO, buf)?;
269        encode_property(&self.server_reference, pt::SERVER_REF, buf)?;
270        encode_property(&self.auth_method, pt::AUTH_METHOD, buf)?;
271        encode_property(&self.auth_data, pt::AUTH_DATA, buf)?;
272
273        encode_opt_props(
274            &self.user_properties,
275            &self.reason_string,
276            buf,
277            size - (buf.len() - start_len) as u32,
278        )
279    }
280}