ntex_mqtt/v5/codec/packet/
connack.rs

1use std::num::NonZeroU16;
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use crate::error::{DecodeError, EncodeError};
6use crate::types::{ConnectAckFlags, QoS};
7use crate::utils::{self, Decode, Encode, Property};
8use crate::v5::codec::{encode::*, property_type as pt, UserProperties, UserProperty};
9use crate::v5::RECEIVE_MAX_DEFAULT;
10
11/// Connect acknowledgment packet
12#[derive(Debug, PartialEq, Eq, Clone)]
13pub struct ConnectAck {
14    /// enables a Client to establish whether the Client and Server have a consistent view
15    /// about whether there is already stored Session state.
16    pub session_present: bool,
17    pub reason_code: ConnectAckReason,
18
19    pub session_expiry_interval_secs: Option<u32>,
20    pub receive_max: NonZeroU16,
21    pub max_qos: QoS,
22    pub max_packet_size: Option<u32>,
23    pub assigned_client_id: Option<ByteString>,
24    pub topic_alias_max: u16,
25    pub retain_available: bool,
26    pub wildcard_subscription_available: bool,
27    pub subscription_identifiers_available: bool,
28    pub shared_subscription_available: bool,
29    pub server_keepalive_sec: Option<u16>,
30    pub response_info: Option<ByteString>,
31    pub server_reference: Option<ByteString>,
32    pub auth_method: Option<ByteString>,
33    pub auth_data: Option<Bytes>,
34    pub reason_string: Option<ByteString>,
35    pub user_properties: UserProperties,
36}
37
38impl Default for ConnectAck {
39    fn default() -> ConnectAck {
40        ConnectAck {
41            session_present: false,
42            reason_code: ConnectAckReason::Success,
43            session_expiry_interval_secs: None,
44            receive_max: RECEIVE_MAX_DEFAULT,
45            max_qos: QoS::ExactlyOnce,
46            max_packet_size: None,
47            assigned_client_id: None,
48            topic_alias_max: 0,
49            retain_available: true,
50            wildcard_subscription_available: true,
51            subscription_identifiers_available: true,
52            shared_subscription_available: true,
53            server_keepalive_sec: None,
54            response_info: None,
55            server_reference: None,
56            auth_method: None,
57            auth_data: None,
58            reason_string: None,
59            user_properties: Vec::new(),
60        }
61    }
62}
63
64prim_enum! {
65    /// CONNACK reason codes
66    pub enum ConnectAckReason {
67        Success = 0,
68        UnspecifiedError = 128,
69        MalformedPacket = 129,
70        ProtocolError = 130,
71        ImplementationSpecificError = 131,
72        UnsupportedProtocolVersion = 132,
73        ClientIdentifierNotValid = 133,
74        BadUserNameOrPassword = 134,
75        NotAuthorized = 135,
76        ServerUnavailable = 136,
77        ServerBusy = 137,
78        Banned = 138,
79        BadAuthenticationMethod = 140,
80        TopicNameInvalid = 144,
81        PacketTooLarge = 149,
82        QuotaExceeded = 151,
83        PayloadFormatInvalid = 153,
84        RetainNotSupported = 154,
85        QosNotSupported = 155,
86        UseAnotherServer = 156,
87        ServerMoved = 157,
88        ConnectionRateExceeded = 159
89    }
90}
91
92impl ConnectAckReason {
93    pub fn reason(self) -> &'static str {
94        match self {
95            ConnectAckReason::Success => "Connection Accepted",
96            ConnectAckReason::UnsupportedProtocolVersion => "protocol version is not supported",
97            ConnectAckReason::ClientIdentifierNotValid => "client identifier is invalid",
98            ConnectAckReason::ServerUnavailable => "Server unavailable",
99            ConnectAckReason::BadUserNameOrPassword => "bad user name or password",
100            ConnectAckReason::NotAuthorized => "not authorized",
101            _ => "Connection Refused",
102        }
103    }
104}
105
106impl ConnectAck {
107    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
108        ensure!(src.remaining() >= 2, DecodeError::InvalidLength);
109        let flags = ConnectAckFlags::from_bits(src.get_u8())
110            .ok_or(DecodeError::ConnAckReservedFlagSet)?;
111
112        let reason_code = src.get_u8().try_into()?;
113
114        let prop_src = &mut utils::take_properties(src)?;
115
116        let mut session_expiry_interval_secs = None;
117        let mut receive_max = None;
118        let mut max_qos = None;
119        let mut retain_available = None;
120        let mut max_packet_size = None;
121        let mut assigned_client_id = None;
122        let mut topic_alias_max = None;
123        let mut reason_string = None;
124        let mut user_properties = Vec::new();
125        let mut wildcard_sub_avail = None;
126        let mut sub_ids_avail = None;
127        let mut shared_sub_avail = None;
128        let mut server_ka_sec = None;
129        let mut response_info = None;
130        let mut server_reference = None;
131        let mut auth_method = None;
132        let mut auth_data = None;
133        while prop_src.has_remaining() {
134            match prop_src.get_u8() {
135                pt::SESS_EXPIRY_INT => session_expiry_interval_secs.read_value(prop_src)?,
136                pt::RECEIVE_MAX => receive_max.read_value(prop_src)?,
137                pt::MAX_QOS => {
138                    ensure!(max_qos.is_none(), DecodeError::MalformedPacket); // property is set twice while not allowed
139                    ensure!(prop_src.has_remaining(), DecodeError::InvalidLength);
140                    max_qos = Some(prop_src.get_u8().try_into()?);
141                }
142                pt::RETAIN_AVAIL => retain_available.read_value(prop_src)?,
143                pt::MAX_PACKET_SIZE => max_packet_size.read_value(prop_src)?,
144                pt::ASSND_CLIENT_ID => assigned_client_id.read_value(prop_src)?,
145                pt::TOPIC_ALIAS_MAX => topic_alias_max.read_value(prop_src)?,
146                pt::REASON_STRING => reason_string.read_value(prop_src)?,
147                pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
148                pt::WILDCARD_SUB_AVAIL => wildcard_sub_avail.read_value(prop_src)?,
149                pt::SUB_IDS_AVAIL => sub_ids_avail.read_value(prop_src)?,
150                pt::SHARED_SUB_AVAIL => shared_sub_avail.read_value(prop_src)?,
151                pt::SERVER_KA => server_ka_sec.read_value(prop_src)?,
152                pt::RESP_INFO => response_info.read_value(prop_src)?,
153                pt::SERVER_REF => server_reference.read_value(prop_src)?,
154                pt::AUTH_METHOD => auth_method.read_value(prop_src)?,
155                pt::AUTH_DATA => auth_data.read_value(prop_src)?,
156                _ => return Err(DecodeError::MalformedPacket),
157            }
158        }
159        ensure!(!src.has_remaining(), DecodeError::InvalidLength);
160
161        Ok(ConnectAck {
162            session_present: flags.contains(ConnectAckFlags::SESSION_PRESENT),
163            reason_code,
164            session_expiry_interval_secs,
165            receive_max: receive_max.unwrap_or(RECEIVE_MAX_DEFAULT),
166            max_qos: max_qos.unwrap_or(QoS::ExactlyOnce),
167            max_packet_size,
168            assigned_client_id,
169            topic_alias_max: topic_alias_max.unwrap_or(0u16),
170            retain_available: retain_available.unwrap_or(true),
171            wildcard_subscription_available: wildcard_sub_avail.unwrap_or(true),
172            subscription_identifiers_available: sub_ids_avail.unwrap_or(true),
173            shared_subscription_available: shared_sub_avail.unwrap_or(true),
174            server_keepalive_sec: server_ka_sec,
175            response_info,
176            server_reference,
177            auth_method,
178            auth_data,
179            reason_string,
180            user_properties,
181        })
182    }
183}
184
185impl EncodeLtd for ConnectAck {
186    fn encoded_size(&self, limit: u32) -> usize {
187        const HEADER_LEN: usize = 2; // state flags byte + reason code
188
189        let mut prop_len = encoded_property_size(&self.session_expiry_interval_secs)
190            + encoded_property_size_default(&self.receive_max, RECEIVE_MAX_DEFAULT)
191            + if self.max_qos < QoS::ExactlyOnce { 1 + 1 } else { 0 }
192            + encoded_property_size(&self.max_packet_size)
193            + encoded_property_size(&self.assigned_client_id)
194            + encoded_property_size_default(&self.retain_available, true)
195            + encoded_property_size_default(&self.wildcard_subscription_available, true)
196            + encoded_property_size_default(&self.subscription_identifiers_available, true)
197            + encoded_property_size_default(&self.shared_subscription_available, true)
198            + encoded_property_size(&self.server_keepalive_sec)
199            + encoded_property_size(&self.response_info)
200            + encoded_property_size(&self.server_reference)
201            + encoded_property_size(&self.auth_method)
202            + encoded_property_size(&self.auth_data);
203        if self.topic_alias_max > 0 {
204            prop_len += 1 + self.topic_alias_max.encoded_size(); // [property type, value..]
205        }
206
207        let diag_len = encoded_size_opt_props(
208            &self.user_properties,
209            &self.reason_string,
210            reduce_limit(limit, HEADER_LEN + 4 + prop_len),
211        ); // exclude other props and max of 4 bytes for property length value
212        prop_len += diag_len;
213        HEADER_LEN + var_int_len(prop_len) as usize + prop_len
214    }
215
216    fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
217        let start_len = buf.len();
218
219        buf.put_slice(&[u8::from(self.session_present), self.reason_code.into()]);
220
221        let prop_len = var_int_len_from_size(size - 2);
222        utils::write_variable_length(prop_len, buf);
223
224        encode_property(&self.session_expiry_interval_secs, pt::SESS_EXPIRY_INT, buf)?;
225        encode_property_default(&self.receive_max, RECEIVE_MAX_DEFAULT, pt::RECEIVE_MAX, buf)?;
226        if self.max_qos < QoS::ExactlyOnce {
227            buf.put_slice(&[pt::MAX_QOS, self.max_qos.into()]);
228        }
229        encode_property_default(&self.retain_available, true, pt::RETAIN_AVAIL, buf)?;
230        encode_property(&self.max_packet_size, pt::MAX_PACKET_SIZE, buf)?;
231        encode_property(&self.assigned_client_id, pt::ASSND_CLIENT_ID, buf)?;
232        encode_property_default(&self.topic_alias_max, 0, pt::TOPIC_ALIAS_MAX, buf)?;
233        encode_property_default(
234            &self.wildcard_subscription_available,
235            true,
236            pt::WILDCARD_SUB_AVAIL,
237            buf,
238        )?;
239        encode_property_default(
240            &self.subscription_identifiers_available,
241            true,
242            pt::SUB_IDS_AVAIL,
243            buf,
244        )?;
245        encode_property_default(
246            &self.shared_subscription_available,
247            true,
248            pt::SHARED_SUB_AVAIL,
249            buf,
250        )?;
251        encode_property(&self.server_keepalive_sec, pt::SERVER_KA, buf)?;
252        encode_property(&self.response_info, pt::RESP_INFO, buf)?;
253        encode_property(&self.server_reference, pt::SERVER_REF, buf)?;
254        encode_property(&self.auth_method, pt::AUTH_METHOD, buf)?;
255        encode_property(&self.auth_data, pt::AUTH_DATA, buf)?;
256
257        encode_opt_props(
258            &self.user_properties,
259            &self.reason_string,
260            buf,
261            size - (buf.len() - start_len) as u32,
262        )
263    }
264}