hebo_codec/v5/subscribe.rs
1// Copyright (c) 2021 Xu Shaohua <shaohua@biofan.org>. All rights reserved.
2// Use of this source is governed by Apache-2.0 License that can be found
3// in the LICENSE file.
4
5use std::convert::TryFrom;
6
7use super::{
8 property::check_multiple_subscription_identifiers, property::check_property_type_list,
9 Properties, PropertyType,
10};
11use crate::{
12 ByteArray, DecodeError, DecodePacket, EncodeError, EncodePacket, FixedHeader, Packet, PacketId,
13 PacketType, QoS, SubTopic, VarIntError,
14};
15
16#[repr(u8)]
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum RetainHandling {
19 /// 0 = Send retained messages at the time of the subscribe.
20 #[default]
21 Send = 0,
22
23 /// 1 = Send retained messages at subscribe only if the subscription does not currently exist.
24 SendFirst = 1,
25
26 /// 2 = Do not send retained messages at the time of the subscribe.
27 NoSend = 2,
28}
29
30impl TryFrom<u8> for RetainHandling {
31 type Error = DecodeError;
32
33 fn try_from(v: u8) -> Result<Self, Self::Error> {
34 match v {
35 0 => Ok(Self::Send),
36 1 => Ok(Self::SendFirst),
37 2 => Ok(Self::NoSend),
38 _ => Err(DecodeError::OtherErrors),
39 }
40 }
41}
42
43/// Topic/QoS pair.
44#[allow(clippy::module_name_repetitions)]
45#[derive(Clone, Debug, Default, PartialEq, Eq)]
46pub struct SubscribeTopic {
47 /// Subscribed `topic` contains wildcard characters to match interested topics with patterns.
48 topic: SubTopic,
49
50 /// Bits 0 and 1 of the Subscription Options represent Maximum QoS field.
51 ///
52 /// This gives the maximum QoS level at which the Server can send Application Messages
53 /// to the Client. It is a Protocol Error if the Maximum QoS field has the value 3.
54 qos: QoS,
55
56 /// Bit 2 of the Subscription Options represents the No Local option.
57 ///
58 /// If the value is 1, Application Messages MUST NOT be forwarded to a connection
59 /// with a ClientID equal to the ClientID of the publishing connection [MQTT-3.8.3-3].
60 ///
61 /// It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription [MQTT-3.8.3-4].
62 no_local: bool,
63
64 /// Bit 3 of the Subscription Options represents the Retain As Published option.
65 ///
66 /// If 1, Application Messages forwarded using this subscription keep the RETAIN flag
67 /// they were published with. If 0, Application Messages forwarded using this subscription
68 /// have the RETAIN flag set to 0. Retained messages sent when the subscription
69 /// is established have the RETAIN flag set to 1.
70 retain_as_published: bool,
71
72 /// Bits 4 and 5 of the Subscription Options represent the Retain Handling option.
73 ///
74 /// This option specifies whether retained messages are sent when the subscription
75 /// is established. This does not affect the sending of retained messages
76 /// at any point after the subscribe. If there are no retained messages
77 /// matching the Topic Filter, all of these values act the same. The values are:
78 ///
79 /// - 0 = Send retained messages at the time of the subscribe
80 /// - 1 = Send retained messages at subscribe only if the subscription does not currently exist
81 /// - 2 = Do not send retained messages at the time of the subscribe
82 ///
83 /// It is a Protocol Error to send a Retain Handling value of 3.
84 retain_handling: RetainHandling,
85}
86
87impl SubscribeTopic {
88 /// Create a new subscribe topic.
89 ///
90 /// # Errors
91 ///
92 /// Returns error if `topic` is invalid.
93 pub fn new(topic: &str, qos: QoS) -> Result<Self, EncodeError> {
94 let topic = SubTopic::new(topic)?;
95 Ok(Self {
96 topic,
97 qos,
98 ..Self::default()
99 })
100 }
101
102 /// Update topic pattern.
103 ///
104 /// # Errors
105 ///
106 /// Returns error if `topic` is invalid.
107 pub fn set_topic(&mut self, topic: &str) -> Result<&mut Self, EncodeError> {
108 self.topic = SubTopic::new(topic)?;
109 Ok(self)
110 }
111
112 /// Get current topic pattern.
113 #[must_use]
114 pub fn topic(&self) -> &str {
115 self.topic.as_ref()
116 }
117
118 /// Update `qos` value.
119 pub fn set_qos(&mut self, qos: QoS) -> &mut Self {
120 self.qos = qos;
121 self
122 }
123
124 /// Get current `QoS`.
125 #[must_use]
126 pub const fn qos(&self) -> QoS {
127 self.qos
128 }
129
130 /// Set `no_local` flag.
131 pub fn set_no_local(&mut self, no_local: bool) -> &mut Self {
132 self.no_local = no_local;
133 self
134 }
135
136 /// Get `no_local` flag.
137 #[must_use]
138 pub const fn no_local(&self) -> bool {
139 self.no_local
140 }
141
142 /// Update `retain_as_published` flag.
143 pub fn set_retain_as_published(&mut self, retain_as_published: bool) -> &mut Self {
144 self.retain_as_published = retain_as_published;
145 self
146 }
147
148 /// Get `retain_as_published` flag.
149 #[must_use]
150 pub const fn retain_as_published(&self) -> bool {
151 self.retain_as_published
152 }
153
154 /// Update `retain_handling` flag.
155 pub fn set_retain_handling(&mut self, retain_handling: RetainHandling) -> &mut Self {
156 self.retain_handling = retain_handling;
157 self
158 }
159
160 /// Get `retain_handling` flag.
161 #[must_use]
162 pub const fn retain_handling(&self) -> RetainHandling {
163 self.retain_handling
164 }
165
166 pub fn bytes(&self) -> usize {
167 1 + self.topic.bytes()
168 }
169}
170
171impl EncodePacket for SubscribeTopic {
172 fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
173 self.topic.encode(buf)?;
174 let mut flag: u8 = 0b0000_0011 & (self.qos as u8);
175 if self.no_local {
176 flag |= 0b0000_0100;
177 }
178 if self.retain_as_published {
179 flag |= 0b0000_1000;
180 }
181 flag |= 0b0011_0000 & (self.retain_handling as u8);
182 buf.push(flag);
183
184 Ok(self.bytes())
185 }
186}
187
188impl DecodePacket for SubscribeTopic {
189 fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
190 let topic = SubTopic::decode(ba)?;
191
192 let flag = ba.read_byte()?;
193 // Bits 0 and 1 of the Subscription Options represent Maximum QoS field.
194 // This gives the maximum QoS level at which the Server can send
195 // Application Messages to the Client. It is a Protocol Error if
196 // the Maximum QoS field has the value 3.
197 let qos = QoS::try_from(flag & 0b0000_0011)?;
198
199 let no_local = (flag & 0b0000_0100) == 0b0000_0100;
200 let retain_as_published = (flag & 0b0000_1000) == 0b0000_1000;
201 let retain_handling = RetainHandling::try_from(flag & 0b0011_0000)?;
202
203 // Bits 6 and 7 of the Subscription Options byte are reserved for future use.
204 // The Server MUST treat a SUBSCRIBE packet as malformed if any of Reserved bits
205 // in the Payload are non-zero [MQTT-3.8.3-5].
206 if flag & 0b1100_0000 != 0b0000_0000 {
207 return Err(DecodeError::OtherErrors);
208 }
209
210 Ok(Self {
211 topic,
212 qos,
213 no_local,
214 retain_as_published,
215 retain_handling,
216 })
217 }
218}
219
220/// Subscribe packet is sent from the Client to the Server to subscribe one or more topics.
221/// This packet also specifies the maximum `QoS` with which the Server can send Application
222/// message to the Client.
223///
224/// Basic struct of this packet:
225///
226/// ```txt
227/// +----------------------------+
228/// | Fixed header |
229/// | |
230/// +----------------------------+
231/// | Packet Id |
232/// | |
233/// +----------------------------+
234/// | Properties ... |
235/// +----------------------------+
236/// | Topic 0 length |
237/// | |
238/// +----------------------------+
239/// | Topic 0 ... |
240/// +----------------------------+
241/// | Topic 0 QoS |
242/// +----------------------------+
243/// | Topic 1 length |
244/// | |
245/// +----------------------------+
246/// | Topic 1 ... |
247/// +----------------------------+
248/// | Tpoic 1 QoS |
249/// +----------------------------+
250/// | ... |
251/// +----------------------------+
252/// ```
253///
254/// Each topic name is followed by associated `QoS` flag.
255///
256/// If a Server receives a Subscribe packet containing a Topic Filter that is identical
257/// to an existing Subscription's Topic Filter then it must completely replace existing
258/// Subscription with a new Subscription. The Topic Filter in the new Subscription will
259/// be identical to the previous Subscription, also `QoS` may be different. Any existing
260/// retained message will be re-sent to the new Subscrption.
261#[allow(clippy::module_name_repetitions)]
262#[derive(Debug, Default, Clone, PartialEq, Eq)]
263pub struct SubscribePacket {
264 /// `packet_id` is used by the Server to reply SubscribeAckPacket to the client.
265 packet_id: PacketId,
266
267 properties: Properties,
268
269 /// A list of topic the Client subscribes to.
270 topics: Vec<SubscribeTopic>,
271}
272
273/// Properties available in subscribe packet.
274pub const SUBSCRIBE_PROPERTIES: &[PropertyType] = &[
275 // The Subscription Identifier can have the value of 1 to 268,435,455.
276 // It is a Protocol Error if the Subscription Identifier has a value of 0.
277 // It is a Protocol Error to include the Subscription Identifier more than once.
278 //
279 // The Subscription Identifier is associated with any subscription created or
280 // modified as the result of this SUBSCRIBE packet. If there is a Subscription Identifier,
281 // it is stored with the subscription. If this property is not specified,
282 // then the absence of a Subscription Identifier is stored with the subscription.
283 PropertyType::SubscriptionIdentifier,
284 PropertyType::UserProperty,
285];
286
287impl SubscribePacket {
288 /// Create a new subscribe packet.
289 ///
290 /// # Errors
291 ///
292 /// Returns error if `topic` pattern is invalid.
293 pub fn new(topic: &str, qos: QoS, packet_id: PacketId) -> Result<Self, EncodeError> {
294 let topic = SubscribeTopic::new(topic, qos)?;
295 Ok(Self {
296 packet_id,
297 properties: Properties::new(),
298 topics: vec![topic],
299 })
300 }
301
302 /// Update packet id.
303 pub fn set_packet_id(&mut self, packet_id: PacketId) -> &mut Self {
304 self.packet_id = packet_id;
305 self
306 }
307
308 /// Get current packet id.
309 #[must_use]
310 pub const fn packet_id(&self) -> PacketId {
311 self.packet_id
312 }
313
314 /// Get a mutable reference to property list.
315 pub fn properties_mut(&mut self) -> &mut Properties {
316 &mut self.properties
317 }
318
319 /// Get a reference to property list.
320 #[must_use]
321 pub const fn properties(&self) -> &Properties {
322 &self.properties
323 }
324
325 /// Update topic patterns.
326 pub fn set_topics(&mut self, topics: &[SubscribeTopic]) -> &mut Self {
327 self.topics.clear();
328 self.topics.extend_from_slice(topics);
329 self
330 }
331
332 /// Get a reference to topic patterns.
333 #[must_use]
334 pub fn topics(&self) -> &[SubscribeTopic] {
335 &self.topics
336 }
337
338 /// Get a mutable reference to topic patterns.
339 pub fn mut_topics(&mut self) -> &mut Vec<SubscribeTopic> {
340 &mut self.topics
341 }
342
343 fn get_fixed_header(&self) -> Result<FixedHeader, VarIntError> {
344 let mut remaining_length = PacketId::bytes();
345 for topic in &self.topics {
346 remaining_length += topic.bytes();
347 }
348
349 FixedHeader::new(PacketType::Subscribe, remaining_length)
350 }
351}
352
353impl DecodePacket for SubscribePacket {
354 fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
355 let fixed_header = FixedHeader::decode(ba)?;
356 if fixed_header.packet_type() != PacketType::Subscribe {
357 return Err(DecodeError::InvalidPacketType);
358 }
359
360 let packet_id = PacketId::decode(ba)?;
361 if packet_id.value() == 0 {
362 // SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in cases where QoS > 0) Control Packets
363 // MUST contain a non-zero 16-bit Packet Identifier. [MQTT-2.3.1-1]
364 return Err(DecodeError::InvalidPacketId);
365 }
366
367 let properties = Properties::decode(ba)?;
368 if let Err(property_type) =
369 check_property_type_list(properties.props(), SUBSCRIBE_PROPERTIES)
370 {
371 log::error!(
372 "v5/SubscribePacket: property type {:?} cannot be used in properties!",
373 property_type
374 );
375 return Err(DecodeError::InvalidPropertyType);
376 }
377 if let Err(property_type) = check_multiple_subscription_identifiers(properties.props()) {
378 log::error!(
379 "v5/SubscribePacket: property type {:?} cannot be used in properties!",
380 property_type
381 );
382 return Err(DecodeError::InvalidPropertyType);
383 }
384
385 let mut remaining_length = PacketId::bytes() + properties.bytes();
386 let mut topics = Vec::new();
387
388 // Parse topic/qos list.
389 while remaining_length < fixed_header.remaining_length() {
390 let topic = SubscribeTopic::decode(ba)?;
391 remaining_length += topic.bytes();
392 topics.push(topic);
393 }
394
395 // The payload of a SUBSCRIBE packet MUST contain at least one Topic Filter / QoS pair.
396 // A SUBSCRIBE packet with no payload is a protocol violation [MQTT-3.8.3-3].
397 if topics.is_empty() {
398 return Err(DecodeError::EmptyTopicFilter);
399 }
400
401 Ok(Self {
402 packet_id,
403 properties,
404 topics,
405 })
406 }
407}
408
409impl EncodePacket for SubscribePacket {
410 fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
411 let old_len = buf.len();
412
413 let fixed_header = self.get_fixed_header()?;
414 fixed_header.encode(buf)?;
415
416 // Variable header
417 self.packet_id.encode(buf)?;
418
419 // Payload
420 for topic in &self.topics {
421 topic.encode(buf)?;
422 }
423
424 Ok(buf.len() - old_len)
425 }
426}
427
428impl Packet for SubscribePacket {
429 fn packet_type(&self) -> PacketType {
430 PacketType::Subscribe
431 }
432
433 fn bytes(&self) -> Result<usize, VarIntError> {
434 let fixed_header = self.get_fixed_header()?;
435 Ok(fixed_header.bytes() + fixed_header.remaining_length())
436 }
437}