1use std::num::NonZeroU16;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use crate::error::{DecodeError, EncodeError};
8use crate::types::{ConnectAckFlags, QoS};
9use crate::utils::{self, Decode, Encode, Property};
10use crate::v5::RECEIVE_MAX_DEFAULT;
11use crate::v5::{encode::*, property_type as pt, UserProperties, UserProperty};
12
13#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
15pub struct ConnectAck {
16 pub session_present: bool,
19 pub reason_code: ConnectAckReason,
20
21 pub session_expiry_interval_secs: Option<u32>,
22 pub receive_max: NonZeroU16,
23 pub max_qos: QoS,
24 pub max_packet_size: Option<u32>,
25 pub assigned_client_id: Option<ByteString>,
26 pub topic_alias_max: u16,
27 pub retain_available: bool,
28 pub wildcard_subscription_available: bool,
29 pub subscription_identifiers_available: bool,
30 pub shared_subscription_available: bool,
31 pub server_keepalive_sec: Option<u16>,
32 pub response_info: Option<ByteString>,
33 pub server_reference: Option<ByteString>,
34 pub auth_method: Option<ByteString>,
35 pub auth_data: Option<Bytes>,
36 pub reason_string: Option<ByteString>,
37 pub user_properties: UserProperties,
38}
39
40impl Default for ConnectAck {
41 fn default() -> ConnectAck {
42 ConnectAck {
43 session_present: false,
44 reason_code: ConnectAckReason::Success,
45 session_expiry_interval_secs: None,
46 receive_max: RECEIVE_MAX_DEFAULT,
47 max_qos: QoS::ExactlyOnce,
48 max_packet_size: None,
49 assigned_client_id: None,
50 topic_alias_max: 0,
51 retain_available: true,
52 wildcard_subscription_available: true,
53 subscription_identifiers_available: true,
54 shared_subscription_available: true,
55 server_keepalive_sec: None,
56 response_info: None,
57 server_reference: None,
58 auth_method: None,
59 auth_data: None,
60 reason_string: None,
61 user_properties: Vec::new(),
62 }
63 }
64}
65
66prim_enum! {
67 #[derive(Deserialize, Serialize)]
69 pub enum ConnectAckReason {
70 Success = 0,
71 UnspecifiedError = 128,
72 MalformedPacket = 129,
73 ProtocolError = 130,
74 ImplementationSpecificError = 131,
75 UnsupportedProtocolVersion = 132,
76 ClientIdentifierNotValid = 133,
77 BadUserNameOrPassword = 134,
78 NotAuthorized = 135,
79 ServerUnavailable = 136,
80 ServerBusy = 137,
81 Banned = 138,
82 BadAuthenticationMethod = 140,
83 TopicNameInvalid = 144,
84 PacketTooLarge = 149,
85 QuotaExceeded = 151,
86 PayloadFormatInvalid = 153,
87 RetainNotSupported = 154,
88 QosNotSupported = 155,
89 UseAnotherServer = 156,
90 ServerMoved = 157,
91 ConnectionRateExceeded = 159
92 }
93}
94
95impl From<ConnectAckReason> for u8 {
96 fn from(v: ConnectAckReason) -> Self {
97 match v {
98 ConnectAckReason::Success => 0,
99 ConnectAckReason::UnspecifiedError => 128,
100 ConnectAckReason::MalformedPacket => 129,
101 ConnectAckReason::ProtocolError => 130,
102 ConnectAckReason::ImplementationSpecificError => 131,
103 ConnectAckReason::UnsupportedProtocolVersion => 132,
104 ConnectAckReason::ClientIdentifierNotValid => 133,
105 ConnectAckReason::BadUserNameOrPassword => 134,
106 ConnectAckReason::NotAuthorized => 135,
107 ConnectAckReason::ServerUnavailable => 136,
108 ConnectAckReason::ServerBusy => 137,
109 ConnectAckReason::Banned => 138,
110 ConnectAckReason::BadAuthenticationMethod => 140,
111 ConnectAckReason::TopicNameInvalid => 144,
112 ConnectAckReason::PacketTooLarge => 149,
113 ConnectAckReason::QuotaExceeded => 151,
114 ConnectAckReason::PayloadFormatInvalid => 153,
115 ConnectAckReason::RetainNotSupported => 154,
116 ConnectAckReason::QosNotSupported => 155,
117 ConnectAckReason::UseAnotherServer => 156,
118 ConnectAckReason::ServerMoved => 157,
119 ConnectAckReason::ConnectionRateExceeded => 159,
120 }
121 }
122}
123
124impl ConnectAckReason {
125 pub fn reason(self) -> &'static str {
126 match self {
127 ConnectAckReason::Success => "Connection Accepted",
128 ConnectAckReason::UnsupportedProtocolVersion => "protocol version is not supported",
129 ConnectAckReason::ClientIdentifierNotValid => "client identifier is invalid",
130 ConnectAckReason::ServerUnavailable => "Server unavailable",
131 ConnectAckReason::BadUserNameOrPassword => "bad user name or password",
132 ConnectAckReason::NotAuthorized => "not authorized",
133 _ => "Connection Refused",
134 }
135 }
136}
137
138impl ConnectAck {
139 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
140 ensure!(src.remaining() >= 2, DecodeError::InvalidLength);
141 let flags = ConnectAckFlags::from_bits(src.get_u8()).ok_or(DecodeError::ConnAckReservedFlagSet)?;
142
143 let reason_code = src.get_u8().try_into()?;
144
145 let prop_src = &mut utils::take_properties(src)?;
146
147 let mut session_expiry_interval_secs = None;
148 let mut receive_max = None;
149 let mut max_qos = None;
150 let mut retain_available = None;
151 let mut max_packet_size = None;
152 let mut assigned_client_id = None;
153 let mut topic_alias_max = None;
154 let mut reason_string = None;
155 let mut user_properties = Vec::new();
156 let mut wildcard_sub_avail = None;
157 let mut sub_ids_avail = None;
158 let mut shared_sub_avail = None;
159 let mut server_ka_sec = None;
160 let mut response_info = None;
161 let mut server_reference = None;
162 let mut auth_method = None;
163 let mut auth_data = None;
164 while prop_src.has_remaining() {
165 match prop_src.get_u8() {
166 pt::SESS_EXPIRY_INT => session_expiry_interval_secs.read_value(prop_src)?,
167 pt::RECEIVE_MAX => receive_max.read_value(prop_src)?,
168 pt::MAX_QOS => {
169 ensure!(max_qos.is_none(), DecodeError::MalformedPacket); ensure!(prop_src.has_remaining(), DecodeError::InvalidLength);
171 max_qos = Some(prop_src.get_u8().try_into()?);
172 }
173 pt::RETAIN_AVAIL => retain_available.read_value(prop_src)?,
174 pt::MAX_PACKET_SIZE => max_packet_size.read_value(prop_src)?,
175 pt::ASSND_CLIENT_ID => assigned_client_id.read_value(prop_src)?,
176 pt::TOPIC_ALIAS_MAX => topic_alias_max.read_value(prop_src)?,
177 pt::REASON_STRING => reason_string.read_value(prop_src)?,
178 pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
179 pt::WILDCARD_SUB_AVAIL => wildcard_sub_avail.read_value(prop_src)?,
180 pt::SUB_IDS_AVAIL => sub_ids_avail.read_value(prop_src)?,
181 pt::SHARED_SUB_AVAIL => shared_sub_avail.read_value(prop_src)?,
182 pt::SERVER_KA => server_ka_sec.read_value(prop_src)?,
183 pt::RESP_INFO => response_info.read_value(prop_src)?,
184 pt::SERVER_REF => server_reference.read_value(prop_src)?,
185 pt::AUTH_METHOD => auth_method.read_value(prop_src)?,
186 pt::AUTH_DATA => auth_data.read_value(prop_src)?,
187 _ => return Err(DecodeError::MalformedPacket),
188 }
189 }
190 ensure!(!src.has_remaining(), DecodeError::InvalidLength);
191
192 Ok(ConnectAck {
193 session_present: flags.contains(ConnectAckFlags::SESSION_PRESENT),
194 reason_code,
195 session_expiry_interval_secs,
196 receive_max: receive_max.unwrap_or(RECEIVE_MAX_DEFAULT),
197 max_qos: max_qos.unwrap_or(QoS::ExactlyOnce),
198 max_packet_size,
199 assigned_client_id,
200 topic_alias_max: topic_alias_max.unwrap_or(0u16),
201 retain_available: retain_available.unwrap_or(true),
202 wildcard_subscription_available: wildcard_sub_avail.unwrap_or(true),
203 subscription_identifiers_available: sub_ids_avail.unwrap_or(true),
204 shared_subscription_available: shared_sub_avail.unwrap_or(true),
205 server_keepalive_sec: server_ka_sec,
206 response_info,
207 server_reference,
208 auth_method,
209 auth_data,
210 reason_string,
211 user_properties,
212 })
213 }
214}
215
216impl EncodeLtd for ConnectAck {
217 fn encoded_size(&self, limit: u32) -> usize {
218 const HEADER_LEN: usize = 2; let mut prop_len = encoded_property_size(&self.session_expiry_interval_secs)
221 + encoded_property_size_default(&self.receive_max, RECEIVE_MAX_DEFAULT)
222 + if self.max_qos < QoS::ExactlyOnce { 1 + 1 } else { 0 }
223 + encoded_property_size(&self.max_packet_size)
224 + encoded_property_size(&self.assigned_client_id)
225 + encoded_property_size_default(&self.retain_available, true)
226 + encoded_property_size_default(&self.wildcard_subscription_available, true)
227 + encoded_property_size_default(&self.subscription_identifiers_available, true)
228 + encoded_property_size_default(&self.shared_subscription_available, true)
229 + encoded_property_size(&self.server_keepalive_sec)
230 + encoded_property_size(&self.response_info)
231 + encoded_property_size(&self.server_reference)
232 + encoded_property_size(&self.auth_method)
233 + encoded_property_size(&self.auth_data);
234 if self.topic_alias_max > 0 {
235 prop_len += 1 + self.topic_alias_max.encoded_size(); }
237
238 let diag_len = encoded_size_opt_props(
239 &self.user_properties,
240 &self.reason_string,
241 reduce_limit(limit, HEADER_LEN + 4 + prop_len),
242 ); prop_len += diag_len;
244 HEADER_LEN + var_int_len(prop_len) as usize + prop_len
245 }
246
247 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
248 let start_len = buf.len();
249
250 buf.put_slice(&[u8::from(self.session_present), self.reason_code.into()]);
251
252 let prop_len = var_int_len_from_size(size - 2);
253 utils::write_variable_length(prop_len, buf);
254
255 encode_property(&self.session_expiry_interval_secs, pt::SESS_EXPIRY_INT, buf)?;
256 encode_property_default(&self.receive_max, RECEIVE_MAX_DEFAULT, pt::RECEIVE_MAX, buf)?;
257 if self.max_qos < QoS::ExactlyOnce {
258 buf.put_slice(&[pt::MAX_QOS, self.max_qos.into()]);
259 }
260 encode_property_default(&self.retain_available, true, pt::RETAIN_AVAIL, buf)?;
261 encode_property(&self.max_packet_size, pt::MAX_PACKET_SIZE, buf)?;
262 encode_property(&self.assigned_client_id, pt::ASSND_CLIENT_ID, buf)?;
263 encode_property_default(&self.topic_alias_max, 0, pt::TOPIC_ALIAS_MAX, buf)?;
264 encode_property_default(&self.wildcard_subscription_available, true, pt::WILDCARD_SUB_AVAIL, buf)?;
265 encode_property_default(&self.subscription_identifiers_available, true, pt::SUB_IDS_AVAIL, buf)?;
266 encode_property_default(&self.shared_subscription_available, true, pt::SHARED_SUB_AVAIL, buf)?;
267 encode_property(&self.server_keepalive_sec, pt::SERVER_KA, buf)?;
268 encode_property(&self.response_info, pt::RESP_INFO, buf)?;
269 encode_property(&self.server_reference, pt::SERVER_REF, buf)?;
270 encode_property(&self.auth_method, pt::AUTH_METHOD, buf)?;
271 encode_property(&self.auth_data, pt::AUTH_DATA, buf)?;
272
273 encode_opt_props(
274 &self.user_properties,
275 &self.reason_string,
276 buf,
277 size - (buf.len() - start_len) as u32,
278 )
279 }
280}