sage_mqtt/control/
connack.rs

1use crate::{
2    codec,
3    defaults::{
4        DEFAULT_MAXIMUM_QOS, DEFAULT_RECEIVE_MAXIMUM, DEFAULT_RETAIN_AVAILABLE,
5        DEFAULT_SHARED_SUBSCRIPTION_AVAILABLE, DEFAULT_SUBSCRIPTION_IDENTIFIER_AVAILABLE,
6        DEFAULT_TOPIC_ALIAS_MAXIMUM, DEFAULT_WILCARD_SUBSCRIPTION_AVAILABLE,
7    },
8    Authentication, ClientID, PropertiesDecoder, Property, QoS,
9    ReasonCode::{self, ProtocolError},
10    Result as SageResult,
11};
12use std::{convert::TryInto, marker::Unpin};
13use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
14
15/// The `Connack` message is sent from the server to the client to acknowledge
16/// the connection request. This can be the direct response to a `Connect`
17/// message or the closing exchange of `Connack` packets.
18#[derive(PartialEq, Debug, Clone)]
19pub struct ConnAck {
20    /// If the `session_present` is true, the connection is accepted using a
21    /// previously and unexpired session.
22    pub session_present: bool,
23
24    /// The reason code for the connect acknowledgement.
25    /// - `Success`
26    /// - `UnspecifiedError`
27    /// - `MalformedPacket`
28    /// - `ProtocolError`
29    /// - `ImplementationSpecificError`
30    /// - `UnsupportedProtocolVersion`
31    /// - `ClientIdentifierNotValid`
32    /// - `BadUserNameOrPassword`
33    /// - `NotAuthorized`
34    /// - `ServerUnavailable`
35    /// - `ServerBusy`
36    pub reason_code: ReasonCode,
37
38    /// The session expiry interval the server will use. If absent the server
39    /// simply accepted the value sent by the server in the `Connect` packet.
40    pub session_expiry_interval: Option<u32>,
41
42    /// The maximum number of `AtLeastOnce` and `ExactlyOnce` qualities of
43    /// service the server will concurrently treat for the client.
44    pub receive_maximum: u16,
45
46    /// The maximum quality of service the server is willing to accept.
47    /// This value cannot be `ExactlyOnce`. Any server receiving a message
48    /// with QoS higher than it's maximum is expected to close the connection.
49    pub maximum_qos: QoS,
50
51    /// `true` if the server supports retaining messages. `false` otherwise.
52    /// Sending retain messages (including Last Will) to a server which does not
53    /// support this feature will resulting in a disconnection.
54    pub retain_available: bool,
55
56    /// The maximum size in bytes the server is willing to accept. This value
57    /// cannot be `0`. If absent there is not maximum packet size.
58    pub maximum_packet_size: Option<u32>,
59
60    /// If the `Connect` packet did not have any client id, the server will
61    /// send one using `assigned_client_id`.
62    pub assigned_client_id: Option<ClientID>,
63
64    /// The maximum value the server will accept as topic alias. If `0` the
65    /// server does not accept topic aliases.
66    pub topic_alias_maximum: u16,
67
68    /// A human readable reason string used to describe the connack. This field
69    /// is optional.
70    pub reason_string: Option<String>,
71
72    /// General purpose user properties.
73    pub user_properties: Vec<(String, String)>,
74
75    /// If `true`, the server accepts subscribing to topics using wildcards.
76    pub wildcard_subscription_available: bool,
77
78    /// If `true`, the server accepts subscription identifiers.
79    pub subscription_identifiers_available: bool,
80
81    /// If `true`, the server accepts shared subscriptions.
82    pub shared_subscription_available: bool,
83
84    /// The server can override the keep alive value requested by the client
85    /// upon `Connect`.
86    pub keep_alive: Option<u16>,
87
88    /// If the client asked for response information, the server may send it
89    /// in `response_information`.
90    /// The response information can be used by the client as an hint to
91    /// generate reponse topic when making Request/Reponse messages.
92    pub response_information: Option<String>,
93
94    /// If the reason code is `ServerMoved` or `UserAnotherServer`, the
95    /// `reference` field is used to inform the client about why new server to
96    /// connect to instead.
97    pub reference: Option<String>,
98
99    /// Upon using enhanced connexion, ending the `Connack` exchange will result in
100    /// a `ConnAck` packet which may contain `Authentication` data.
101    pub authentication: Option<Authentication>,
102}
103
104impl Default for ConnAck {
105    fn default() -> Self {
106        ConnAck {
107            session_present: false,
108            reason_code: ReasonCode::Success,
109            session_expiry_interval: None,
110            receive_maximum: DEFAULT_RECEIVE_MAXIMUM,
111            maximum_qos: DEFAULT_MAXIMUM_QOS,
112            retain_available: DEFAULT_RETAIN_AVAILABLE,
113            maximum_packet_size: None,
114            assigned_client_id: None,
115            topic_alias_maximum: DEFAULT_TOPIC_ALIAS_MAXIMUM,
116            reason_string: Default::default(),
117            user_properties: Default::default(),
118            wildcard_subscription_available: DEFAULT_WILCARD_SUBSCRIPTION_AVAILABLE,
119            subscription_identifiers_available: DEFAULT_SUBSCRIPTION_IDENTIFIER_AVAILABLE,
120            shared_subscription_available: DEFAULT_SHARED_SUBSCRIPTION_AVAILABLE,
121            keep_alive: None,
122            response_information: Default::default(),
123            reference: None,
124            authentication: None,
125        }
126    }
127}
128
129impl ConnAck {
130    pub(crate) async fn write<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
131        let mut n_bytes = codec::write_bool(self.session_present, writer).await?;
132        n_bytes += codec::write_reason_code(self.reason_code, writer).await?;
133
134        let mut properties = Vec::new();
135
136        if let Some(v) = self.session_expiry_interval {
137            n_bytes += Property::SessionExpiryInterval(v)
138                .encode(&mut properties)
139                .await?;
140        }
141        n_bytes += Property::ReceiveMaximum(self.receive_maximum)
142            .encode(&mut properties)
143            .await?;
144        n_bytes += Property::MaximumQoS(self.maximum_qos)
145            .encode(&mut properties)
146            .await?;
147        n_bytes += Property::RetainAvailable(self.retain_available)
148            .encode(&mut properties)
149            .await?;
150        if let Some(v) = self.maximum_packet_size {
151            n_bytes += Property::MaximumPacketSize(v)
152                .encode(&mut properties)
153                .await?;
154        }
155        if let Some(v) = self.assigned_client_id {
156            n_bytes += Property::AssignedClientIdentifier(v)
157                .encode(&mut properties)
158                .await?;
159        }
160        n_bytes += Property::TopicAliasMaximum(self.topic_alias_maximum)
161            .encode(&mut properties)
162            .await?;
163        if let Some(reason_string) = self.reason_string {
164            if !reason_string.is_empty() {
165                n_bytes += Property::ReasonString(reason_string)
166                    .encode(&mut properties)
167                    .await?;
168            }
169        }
170        for (k, v) in self.user_properties {
171            n_bytes += Property::UserProperty(k, v).encode(&mut properties).await?;
172        }
173        n_bytes += Property::WildcardSubscriptionAvailable(self.wildcard_subscription_available)
174            .encode(&mut properties)
175            .await?;
176        n_bytes += Property::SharedSubscriptionAvailable(self.shared_subscription_available)
177            .encode(&mut properties)
178            .await?;
179        if let Some(v) = self.keep_alive {
180            n_bytes += Property::ServerKeepAlive(v).encode(&mut properties).await?;
181        }
182
183        if let Some(v) = self.response_information {
184            n_bytes += Property::ResponseInformation(v)
185                .encode(&mut properties)
186                .await?;
187        }
188
189        if let Some(v) = self.reference {
190            n_bytes += Property::ServerReference(v).encode(&mut properties).await?;
191        }
192        if let Some(authentication) = self.authentication {
193            n_bytes += Property::AuthenticationMethod(authentication.method)
194                .encode(&mut properties)
195                .await?;
196            if !authentication.data.is_empty() {
197                n_bytes += Property::AuthenticationData(authentication.data)
198                    .encode(&mut properties)
199                    .await?;
200            }
201        }
202
203        n_bytes += codec::write_variable_byte_integer(properties.len() as u32, writer).await?;
204        writer.write_all(&properties).await?;
205
206        Ok(n_bytes)
207    }
208
209    pub(crate) async fn read<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
210        let session_present = codec::read_bool(reader).await?;
211
212        let reason_code = codec::read_byte(reader).await?.try_into()?;
213
214        let mut session_expiry_interval = None;
215        let mut receive_maximum = DEFAULT_RECEIVE_MAXIMUM;
216        let mut maximum_qos = DEFAULT_MAXIMUM_QOS;
217        let mut retain_available = DEFAULT_RETAIN_AVAILABLE;
218        let mut maximum_packet_size = None;
219        let mut assigned_client_id = None;
220        let mut topic_alias_maximum = DEFAULT_TOPIC_ALIAS_MAXIMUM;
221        let mut reason_string = None;
222        let mut user_properties = Vec::new();
223        let mut wildcard_subscription_available = DEFAULT_WILCARD_SUBSCRIPTION_AVAILABLE;
224        let mut subscription_identifiers_available = DEFAULT_SUBSCRIPTION_IDENTIFIER_AVAILABLE;
225        let mut shared_subscription_available = DEFAULT_SHARED_SUBSCRIPTION_AVAILABLE;
226        let mut keep_alive = None;
227        let mut response_information = None;
228        let mut reference = None;
229        let mut authentication_method = None;
230        let mut authentication_data = Default::default();
231
232        let mut decoder = PropertiesDecoder::take(reader).await?;
233        while decoder.has_properties() {
234            match decoder.read().await? {
235                Property::SessionExpiryInterval(v) => session_expiry_interval = Some(v),
236                Property::ReceiveMaximum(v) => receive_maximum = v,
237                Property::MaximumQoS(v) => maximum_qos = v,
238                Property::RetainAvailable(v) => retain_available = v,
239                Property::MaximumPacketSize(v) => maximum_packet_size = Some(v),
240                Property::AssignedClientIdentifier(v) => assigned_client_id = Some(v),
241                Property::TopicAliasMaximum(v) => topic_alias_maximum = v,
242                Property::ReasonString(v) => {
243                    reason_string = if v.is_empty() { None } else { Some(v) }
244                }
245                Property::UserProperty(k, v) => user_properties.push((k, v)),
246                Property::WildcardSubscriptionAvailable(v) => wildcard_subscription_available = v,
247                Property::SubscriptionIdentifiersAvailable(v) => {
248                    subscription_identifiers_available = v
249                }
250                Property::SharedSubscriptionAvailable(v) => shared_subscription_available = v,
251                Property::ServerKeepAlive(v) => keep_alive = Some(v),
252                Property::ResponseInformation(v) => response_information = Some(v),
253                Property::ServerReference(v) => reference = Some(v),
254                Property::AuthenticationMethod(v) => authentication_method = Some(v),
255                Property::AuthenticationData(v) => authentication_data = v,
256                _ => return Err(ProtocolError.into()),
257            }
258        }
259
260        let authentication = if let Some(method) = authentication_method {
261            Some(Authentication {
262                method,
263                data: authentication_data,
264            })
265        } else {
266            if !authentication_data.is_empty() {
267                return Err(ProtocolError.into());
268            }
269            None
270        };
271
272        Ok(ConnAck {
273            session_present,
274            reason_code,
275            session_expiry_interval,
276            receive_maximum,
277            maximum_qos,
278            retain_available,
279            maximum_packet_size,
280            assigned_client_id,
281            topic_alias_maximum,
282            reason_string,
283            user_properties,
284            wildcard_subscription_available,
285            subscription_identifiers_available,
286            shared_subscription_available,
287            keep_alive,
288            response_information,
289            reference,
290            authentication,
291        })
292    }
293}
294
295#[cfg(test)]
296mod unit {
297
298    use super::*;
299    use std::io::Cursor;
300
301    fn encoded() -> Vec<u8> {
302        vec![
303            1, 138, 111, 17, 0, 0, 5, 57, 33, 0, 30, 36, 1, 37, 0, 39, 0, 0, 1, 0, 18, 0, 11, 87,
304            97, 108, 107, 84, 104, 105, 115, 87, 97, 121, 34, 0, 10, 31, 0, 7, 82, 85, 78, 45, 68,
305            77, 67, 38, 0, 7, 77, 111, 103, 119, 97, 195, 175, 0, 3, 67, 97, 116, 40, 0, 42, 0, 19,
306            0, 17, 26, 0, 9, 65, 101, 114, 111, 115, 109, 105, 116, 104, 28, 0, 14, 80, 97, 105,
307            110, 116, 32, 73, 116, 32, 66, 108, 97, 99, 107, 21, 0, 6, 87, 105, 108, 108, 111, 119,
308            22, 0, 4, 13, 21, 234, 94,
309        ]
310    }
311
312    fn decoded() -> ConnAck {
313        ConnAck {
314            session_present: true,
315            reason_code: ReasonCode::Banned,
316            session_expiry_interval: Some(1337),
317            receive_maximum: 30,
318            maximum_qos: QoS::AtLeastOnce,
319            retain_available: false,
320            maximum_packet_size: Some(256),
321            assigned_client_id: Some("WalkThisWay".into()),
322            topic_alias_maximum: 10,
323            reason_string: Some("RUN-DMC".into()),
324            user_properties: vec![("Mogwaï".into(), "Cat".into())],
325            wildcard_subscription_available: false,
326            subscription_identifiers_available: true,
327            shared_subscription_available: false,
328            keep_alive: Some(17),
329            response_information: Some("Aerosmith".into()),
330            reference: Some("Paint It Black".into()),
331            authentication: Some(Authentication {
332                method: "Willow".into(),
333                data: vec![0x0D, 0x15, 0xEA, 0x5E],
334            }),
335        }
336    }
337
338    #[tokio::test]
339    async fn encode() {
340        let test_data = decoded();
341        let mut tested_result = Vec::new();
342        let n_bytes = test_data.write(&mut tested_result).await.unwrap();
343        assert_eq!(tested_result, encoded());
344        assert_eq!(n_bytes, 114);
345    }
346
347    #[tokio::test]
348    async fn decode() {
349        let mut test_data = Cursor::new(encoded());
350        let tested_result = ConnAck::read(&mut test_data).await.unwrap();
351        assert_eq!(tested_result, decoded());
352    }
353}