1use std::num::NonZeroU16;
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use crate::error::{DecodeError, EncodeError};
6use crate::types::{ConnectAckFlags, QoS};
7use crate::utils::{self, Decode, Encode, Property};
8use crate::v5::codec::{encode::*, property_type as pt, UserProperties, UserProperty};
9use crate::v5::RECEIVE_MAX_DEFAULT;
10
11#[derive(Debug, PartialEq, Eq, Clone)]
13pub struct ConnectAck {
14 pub session_present: bool,
17 pub reason_code: ConnectAckReason,
18
19 pub session_expiry_interval_secs: Option<u32>,
20 pub receive_max: NonZeroU16,
21 pub max_qos: QoS,
22 pub max_packet_size: Option<u32>,
23 pub assigned_client_id: Option<ByteString>,
24 pub topic_alias_max: u16,
25 pub retain_available: bool,
26 pub wildcard_subscription_available: bool,
27 pub subscription_identifiers_available: bool,
28 pub shared_subscription_available: bool,
29 pub server_keepalive_sec: Option<u16>,
30 pub response_info: Option<ByteString>,
31 pub server_reference: Option<ByteString>,
32 pub auth_method: Option<ByteString>,
33 pub auth_data: Option<Bytes>,
34 pub reason_string: Option<ByteString>,
35 pub user_properties: UserProperties,
36}
37
38impl Default for ConnectAck {
39 fn default() -> ConnectAck {
40 ConnectAck {
41 session_present: false,
42 reason_code: ConnectAckReason::Success,
43 session_expiry_interval_secs: None,
44 receive_max: RECEIVE_MAX_DEFAULT,
45 max_qos: QoS::ExactlyOnce,
46 max_packet_size: None,
47 assigned_client_id: None,
48 topic_alias_max: 0,
49 retain_available: true,
50 wildcard_subscription_available: true,
51 subscription_identifiers_available: true,
52 shared_subscription_available: true,
53 server_keepalive_sec: None,
54 response_info: None,
55 server_reference: None,
56 auth_method: None,
57 auth_data: None,
58 reason_string: None,
59 user_properties: Vec::new(),
60 }
61 }
62}
63
64prim_enum! {
65 pub enum ConnectAckReason {
67 Success = 0,
68 UnspecifiedError = 128,
69 MalformedPacket = 129,
70 ProtocolError = 130,
71 ImplementationSpecificError = 131,
72 UnsupportedProtocolVersion = 132,
73 ClientIdentifierNotValid = 133,
74 BadUserNameOrPassword = 134,
75 NotAuthorized = 135,
76 ServerUnavailable = 136,
77 ServerBusy = 137,
78 Banned = 138,
79 BadAuthenticationMethod = 140,
80 TopicNameInvalid = 144,
81 PacketTooLarge = 149,
82 QuotaExceeded = 151,
83 PayloadFormatInvalid = 153,
84 RetainNotSupported = 154,
85 QosNotSupported = 155,
86 UseAnotherServer = 156,
87 ServerMoved = 157,
88 ConnectionRateExceeded = 159
89 }
90}
91
92impl ConnectAckReason {
93 pub fn reason(self) -> &'static str {
94 match self {
95 ConnectAckReason::Success => "Connection Accepted",
96 ConnectAckReason::UnsupportedProtocolVersion => "protocol version is not supported",
97 ConnectAckReason::ClientIdentifierNotValid => "client identifier is invalid",
98 ConnectAckReason::ServerUnavailable => "Server unavailable",
99 ConnectAckReason::BadUserNameOrPassword => "bad user name or password",
100 ConnectAckReason::NotAuthorized => "not authorized",
101 _ => "Connection Refused",
102 }
103 }
104}
105
106impl ConnectAck {
107 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
108 ensure!(src.remaining() >= 2, DecodeError::InvalidLength);
109 let flags = ConnectAckFlags::from_bits(src.get_u8())
110 .ok_or(DecodeError::ConnAckReservedFlagSet)?;
111
112 let reason_code = src.get_u8().try_into()?;
113
114 let prop_src = &mut utils::take_properties(src)?;
115
116 let mut session_expiry_interval_secs = None;
117 let mut receive_max = None;
118 let mut max_qos = None;
119 let mut retain_available = None;
120 let mut max_packet_size = None;
121 let mut assigned_client_id = None;
122 let mut topic_alias_max = None;
123 let mut reason_string = None;
124 let mut user_properties = Vec::new();
125 let mut wildcard_sub_avail = None;
126 let mut sub_ids_avail = None;
127 let mut shared_sub_avail = None;
128 let mut server_ka_sec = None;
129 let mut response_info = None;
130 let mut server_reference = None;
131 let mut auth_method = None;
132 let mut auth_data = None;
133 while prop_src.has_remaining() {
134 match prop_src.get_u8() {
135 pt::SESS_EXPIRY_INT => session_expiry_interval_secs.read_value(prop_src)?,
136 pt::RECEIVE_MAX => receive_max.read_value(prop_src)?,
137 pt::MAX_QOS => {
138 ensure!(max_qos.is_none(), DecodeError::MalformedPacket); ensure!(prop_src.has_remaining(), DecodeError::InvalidLength);
140 max_qos = Some(prop_src.get_u8().try_into()?);
141 }
142 pt::RETAIN_AVAIL => retain_available.read_value(prop_src)?,
143 pt::MAX_PACKET_SIZE => max_packet_size.read_value(prop_src)?,
144 pt::ASSND_CLIENT_ID => assigned_client_id.read_value(prop_src)?,
145 pt::TOPIC_ALIAS_MAX => topic_alias_max.read_value(prop_src)?,
146 pt::REASON_STRING => reason_string.read_value(prop_src)?,
147 pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
148 pt::WILDCARD_SUB_AVAIL => wildcard_sub_avail.read_value(prop_src)?,
149 pt::SUB_IDS_AVAIL => sub_ids_avail.read_value(prop_src)?,
150 pt::SHARED_SUB_AVAIL => shared_sub_avail.read_value(prop_src)?,
151 pt::SERVER_KA => server_ka_sec.read_value(prop_src)?,
152 pt::RESP_INFO => response_info.read_value(prop_src)?,
153 pt::SERVER_REF => server_reference.read_value(prop_src)?,
154 pt::AUTH_METHOD => auth_method.read_value(prop_src)?,
155 pt::AUTH_DATA => auth_data.read_value(prop_src)?,
156 _ => return Err(DecodeError::MalformedPacket),
157 }
158 }
159 ensure!(!src.has_remaining(), DecodeError::InvalidLength);
160
161 Ok(ConnectAck {
162 session_present: flags.contains(ConnectAckFlags::SESSION_PRESENT),
163 reason_code,
164 session_expiry_interval_secs,
165 receive_max: receive_max.unwrap_or(RECEIVE_MAX_DEFAULT),
166 max_qos: max_qos.unwrap_or(QoS::ExactlyOnce),
167 max_packet_size,
168 assigned_client_id,
169 topic_alias_max: topic_alias_max.unwrap_or(0u16),
170 retain_available: retain_available.unwrap_or(true),
171 wildcard_subscription_available: wildcard_sub_avail.unwrap_or(true),
172 subscription_identifiers_available: sub_ids_avail.unwrap_or(true),
173 shared_subscription_available: shared_sub_avail.unwrap_or(true),
174 server_keepalive_sec: server_ka_sec,
175 response_info,
176 server_reference,
177 auth_method,
178 auth_data,
179 reason_string,
180 user_properties,
181 })
182 }
183}
184
185impl EncodeLtd for ConnectAck {
186 fn encoded_size(&self, limit: u32) -> usize {
187 const HEADER_LEN: usize = 2; let mut prop_len = encoded_property_size(&self.session_expiry_interval_secs)
190 + encoded_property_size_default(&self.receive_max, RECEIVE_MAX_DEFAULT)
191 + if self.max_qos < QoS::ExactlyOnce { 1 + 1 } else { 0 }
192 + encoded_property_size(&self.max_packet_size)
193 + encoded_property_size(&self.assigned_client_id)
194 + encoded_property_size_default(&self.retain_available, true)
195 + encoded_property_size_default(&self.wildcard_subscription_available, true)
196 + encoded_property_size_default(&self.subscription_identifiers_available, true)
197 + encoded_property_size_default(&self.shared_subscription_available, true)
198 + encoded_property_size(&self.server_keepalive_sec)
199 + encoded_property_size(&self.response_info)
200 + encoded_property_size(&self.server_reference)
201 + encoded_property_size(&self.auth_method)
202 + encoded_property_size(&self.auth_data);
203 if self.topic_alias_max > 0 {
204 prop_len += 1 + self.topic_alias_max.encoded_size(); }
206
207 let diag_len = encoded_size_opt_props(
208 &self.user_properties,
209 &self.reason_string,
210 reduce_limit(limit, HEADER_LEN + 4 + prop_len),
211 ); prop_len += diag_len;
213 HEADER_LEN + var_int_len(prop_len) as usize + prop_len
214 }
215
216 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
217 let start_len = buf.len();
218
219 buf.put_slice(&[u8::from(self.session_present), self.reason_code.into()]);
220
221 let prop_len = var_int_len_from_size(size - 2);
222 utils::write_variable_length(prop_len, buf);
223
224 encode_property(&self.session_expiry_interval_secs, pt::SESS_EXPIRY_INT, buf)?;
225 encode_property_default(&self.receive_max, RECEIVE_MAX_DEFAULT, pt::RECEIVE_MAX, buf)?;
226 if self.max_qos < QoS::ExactlyOnce {
227 buf.put_slice(&[pt::MAX_QOS, self.max_qos.into()]);
228 }
229 encode_property_default(&self.retain_available, true, pt::RETAIN_AVAIL, buf)?;
230 encode_property(&self.max_packet_size, pt::MAX_PACKET_SIZE, buf)?;
231 encode_property(&self.assigned_client_id, pt::ASSND_CLIENT_ID, buf)?;
232 encode_property_default(&self.topic_alias_max, 0, pt::TOPIC_ALIAS_MAX, buf)?;
233 encode_property_default(
234 &self.wildcard_subscription_available,
235 true,
236 pt::WILDCARD_SUB_AVAIL,
237 buf,
238 )?;
239 encode_property_default(
240 &self.subscription_identifiers_available,
241 true,
242 pt::SUB_IDS_AVAIL,
243 buf,
244 )?;
245 encode_property_default(
246 &self.shared_subscription_available,
247 true,
248 pt::SHARED_SUB_AVAIL,
249 buf,
250 )?;
251 encode_property(&self.server_keepalive_sec, pt::SERVER_KA, buf)?;
252 encode_property(&self.response_info, pt::RESP_INFO, buf)?;
253 encode_property(&self.server_reference, pt::SERVER_REF, buf)?;
254 encode_property(&self.auth_method, pt::AUTH_METHOD, buf)?;
255 encode_property(&self.auth_data, pt::AUTH_DATA, buf)?;
256
257 encode_opt_props(
258 &self.user_properties,
259 &self.reason_string,
260 buf,
261 size - (buf.len() - start_len) as u32,
262 )
263 }
264}