1use std::num::{NonZeroU16, NonZeroU32};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use serde::{Deserialize, Serialize};
6
7use super::ack_props;
8use crate::error::{DecodeError, EncodeError};
9use crate::types::QoS;
10use crate::utils::{self, write_variable_length, Decode, Encode};
11use crate::v5::{encode::*, property_type as pt, UserProperties, UserProperty};
12
13#[derive(Debug, PartialEq, Eq, Clone)]
15pub struct Subscribe {
16 pub packet_id: NonZeroU16,
18 pub id: Option<NonZeroU32>,
20 pub user_properties: UserProperties,
21 pub topic_filters: Vec<(ByteString, SubscriptionOptions)>,
23}
24
25#[derive(Debug, PartialEq, Eq, Copy, Clone)]
26pub struct SubscriptionOptions {
27 pub qos: QoS,
28 pub no_local: bool,
29 pub retain_as_published: bool,
30 pub retain_handling: RetainHandling,
31}
32
33impl Default for SubscriptionOptions {
34 fn default() -> Self {
35 Self {
36 qos: QoS::AtMostOnce,
37 no_local: false,
38 retain_as_published: false,
39 retain_handling: RetainHandling::AtSubscribe,
40 }
41 }
42}
43
44prim_enum! {
45 pub enum RetainHandling {
46 AtSubscribe = 0,
47 AtSubscribeNew = 1,
48 NoAtSubscribe = 2
49 }
50}
51
52impl From<RetainHandling> for u8 {
53 fn from(v: RetainHandling) -> Self {
54 match v {
55 RetainHandling::AtSubscribe => 0,
56 RetainHandling::AtSubscribeNew => 1,
57 RetainHandling::NoAtSubscribe => 2,
58 }
59 }
60}
61
62#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
64pub struct SubscribeAck {
65 pub packet_id: NonZeroU16,
66 pub properties: UserProperties,
67 pub reason_string: Option<ByteString>,
68 pub status: Vec<SubscribeAckReason>,
70}
71
72#[derive(Debug, PartialEq, Eq, Clone)]
74pub struct Unsubscribe {
75 pub packet_id: NonZeroU16,
77 pub user_properties: UserProperties,
78 pub topic_filters: Vec<ByteString>,
80}
81
82#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
84pub struct UnsubscribeAck {
85 pub packet_id: NonZeroU16,
87 pub properties: UserProperties,
88 pub reason_string: Option<ByteString>,
89 pub status: Vec<UnsubscribeAckReason>,
90}
91
92prim_enum! {
93 #[derive(Deserialize, Serialize)]
95 pub enum SubscribeAckReason {
96 GrantedQos0 = 0,
97 GrantedQos1 = 1,
98 GrantedQos2 = 2,
99 UnspecifiedError = 128,
100 ImplementationSpecificError = 131,
101 NotAuthorized = 135,
102 TopicFilterInvalid = 143,
103 PacketIdentifierInUse = 145,
104 QuotaExceeded = 151,
105 SharedSubscriptionNotSupported = 158,
106 SubscriptionIdentifiersNotSupported = 161,
107 WildcardSubscriptionsNotSupported = 162
108 }
109}
110
111impl From<SubscribeAckReason> for u8 {
112 fn from(v: SubscribeAckReason) -> Self {
113 match v {
114 SubscribeAckReason::GrantedQos0 => 0,
115 SubscribeAckReason::GrantedQos1 => 1,
116 SubscribeAckReason::GrantedQos2 => 2,
117 SubscribeAckReason::UnspecifiedError => 128,
118 SubscribeAckReason::ImplementationSpecificError => 131,
119 SubscribeAckReason::NotAuthorized => 135,
120 SubscribeAckReason::TopicFilterInvalid => 143,
121 SubscribeAckReason::PacketIdentifierInUse => 145,
122 SubscribeAckReason::QuotaExceeded => 151,
123 SubscribeAckReason::SharedSubscriptionNotSupported => 158,
124 SubscribeAckReason::SubscriptionIdentifiersNotSupported => 161,
125 SubscribeAckReason::WildcardSubscriptionsNotSupported => 162,
126 }
127 }
128}
129
130prim_enum! {
131 #[derive(Deserialize, Serialize)]
133 pub enum UnsubscribeAckReason {
134 Success = 0,
135 NoSubscriptionExisted = 17,
136 UnspecifiedError = 128,
137 ImplementationSpecificError = 131,
138 NotAuthorized = 135,
139 TopicFilterInvalid = 143,
140 PacketIdentifierInUse = 145
141 }
142}
143
144impl From<UnsubscribeAckReason> for u8 {
145 fn from(v: UnsubscribeAckReason) -> Self {
146 match v {
147 UnsubscribeAckReason::Success => 0,
148 UnsubscribeAckReason::NoSubscriptionExisted => 17,
149 UnsubscribeAckReason::UnspecifiedError => 128,
150 UnsubscribeAckReason::ImplementationSpecificError => 131,
151 UnsubscribeAckReason::NotAuthorized => 135,
152 UnsubscribeAckReason::TopicFilterInvalid => 143,
153 UnsubscribeAckReason::PacketIdentifierInUse => 145,
154 }
155 }
156}
157
158impl Subscribe {
159 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
160 let packet_id = NonZeroU16::decode(src)?;
161 let prop_src = &mut utils::take_properties(src)?;
162 let mut sub_id = None;
163 let mut user_properties = Vec::new();
164 while prop_src.has_remaining() {
165 let prop_id = prop_src.get_u8();
166 match prop_id {
167 pt::SUB_ID => {
168 ensure!(sub_id.is_none(), DecodeError::MalformedPacket); let val = utils::decode_variable_length_cursor(prop_src)?;
170 sub_id = Some(NonZeroU32::new(val).ok_or(DecodeError::MalformedPacket)?);
171 }
172 pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
173 _ => return Err(DecodeError::MalformedPacket),
174 }
175 }
176
177 let mut topic_filters = Vec::new();
178 while src.has_remaining() {
179 let topic = ByteString::decode(src)?;
180 let opts = SubscriptionOptions::decode(src)?;
181 topic_filters.push((topic, opts));
182 }
183
184 Ok(Self { packet_id, id: sub_id, user_properties, topic_filters })
185 }
186}
187
188impl SubscribeAck {
189 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
190 let packet_id = NonZeroU16::decode(src)?;
191 let (properties, reason_string) = ack_props::decode(src)?;
192 let mut status = Vec::with_capacity(src.remaining());
193 for code in src.as_ref().iter().copied() {
194 status.push(code.try_into()?);
195 }
196 Ok(Self { packet_id, properties, reason_string, status })
197 }
198}
199
200impl Unsubscribe {
201 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
202 let packet_id = NonZeroU16::decode(src)?;
203
204 let prop_src = &mut utils::take_properties(src)?;
205 let mut user_properties = Vec::new();
206 while prop_src.has_remaining() {
207 let prop_id = prop_src.get_u8();
208 match prop_id {
209 pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
210 _ => return Err(DecodeError::MalformedPacket),
211 }
212 }
213
214 let mut topic_filters = Vec::new();
215 while src.remaining() > 0 {
216 topic_filters.push(ByteString::decode(src)?);
217 }
218
219 Ok(Self { packet_id, user_properties, topic_filters })
220 }
221}
222
223impl UnsubscribeAck {
224 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
225 let packet_id = NonZeroU16::decode(src)?;
226 let (properties, reason_string) = ack_props::decode(src)?;
227 let mut status = Vec::with_capacity(src.remaining());
228 for code in src.as_ref().iter().copied() {
229 status.push(code.try_into()?);
230 }
231 Ok(Self { packet_id, properties, reason_string, status })
232 }
233}
234
235impl EncodeLtd for Subscribe {
236 fn encoded_size(&self, _limit: u32) -> usize {
237 let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize) as usize) + self.user_properties.encoded_size();
239 let payload_len =
240 self.topic_filters.iter().fold(0, |acc, (filter, _opts)| acc + filter.encoded_size() + 1);
241 self.packet_id.encoded_size() + var_int_len(prop_len) as usize + prop_len + payload_len
242 }
243
244 fn encode(&self, buf: &mut BytesMut, _: u32) -> Result<(), EncodeError> {
245 self.packet_id.encode(buf)?;
246
247 let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize))
249 + self.user_properties.encoded_size() as u32; utils::write_variable_length(prop_len, buf);
251
252 if let Some(id) = self.id {
253 buf.put_u8(pt::SUB_ID);
254 write_variable_length(id.get(), buf);
255 }
256
257 self.user_properties.encode(buf)?;
258
259 for (filter, opts) in self.topic_filters.iter() {
261 filter.encode(buf)?;
262 opts.encode(buf)?;
263 }
264
265 Ok(())
266 }
267}
268
269impl Decode for SubscriptionOptions {
270 fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
271 ensure!(src.has_remaining(), DecodeError::InvalidLength);
272 let val = src.get_u8();
273 let qos = (val & 0b0000_0011).try_into()?;
274 let retain_handling = ((val & 0b0011_0000) >> 4).try_into()?;
275 Ok(SubscriptionOptions {
276 qos,
277 no_local: val & 0b0000_0100 != 0,
278 retain_as_published: val & 0b0000_1000 != 0,
279 retain_handling,
280 })
281 }
282}
283
284impl Encode for SubscriptionOptions {
285 fn encoded_size(&self) -> usize {
286 1
287 }
288 fn encode(&self, buf: &mut BytesMut) -> Result<(), EncodeError> {
289 buf.put_u8(
290 u8::from(self.qos)
291 | ((self.no_local as u8) << 2)
292 | ((self.retain_as_published as u8) << 3)
293 | (u8::from(self.retain_handling) << 4),
294 );
295 Ok(())
296 }
297}
298
299impl EncodeLtd for SubscribeAck {
300 fn encoded_size(&self, limit: u32) -> usize {
301 let len = self.status.len();
302 if len > (u32::MAX - 2) as usize {
303 return usize::MAX; }
305
306 2 + ack_props::encoded_size(&self.properties, &self.reason_string, limit - 2 - len as u32) + len
307 }
308
309 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
310 self.packet_id.encode(buf)?;
311 let len = self.status.len() as u32; ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
313 for &reason in self.status.iter() {
314 buf.put_u8(reason.into());
315 }
316 Ok(())
317 }
318}
319
320impl EncodeLtd for Unsubscribe {
321 fn encoded_size(&self, _limit: u32) -> usize {
322 let prop_len = self.user_properties.encoded_size();
323 2 + var_int_len(prop_len) as usize
324 + prop_len
325 + self.topic_filters.iter().fold(0, |acc, filter| acc + 2 + filter.len())
326 }
327
328 fn encode(&self, buf: &mut BytesMut, _size: u32) -> Result<(), EncodeError> {
329 self.packet_id.encode(buf)?;
330
331 let prop_len = self.user_properties.encoded_size();
333 utils::write_variable_length(prop_len as u32, buf); self.user_properties.encode(buf)?;
335
336 for filter in self.topic_filters.iter() {
338 filter.encode(buf)?;
339 }
340 Ok(())
341 }
342}
343
344impl EncodeLtd for UnsubscribeAck {
345 fn encoded_size(&self, limit: u32) -> usize {
347 let len = self.status.len();
348 2 + len + ack_props::encoded_size(&self.properties, &self.reason_string, reduce_limit(limit, 2 + len))
349 }
350
351 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
352 self.packet_id.encode(buf)?;
353 let len = self.status.len() as u32;
354
355 ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
356 for &reason in self.status.iter() {
357 buf.put_u8(reason.into());
358 }
359 Ok(())
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use crate::v5::{Codec, Packet};
367 use tokio_util::codec::Decoder;
368 use tokio_util::codec::Encoder;
369
370 #[test]
371 fn test_sub() {
372 let pkt = Subscribe {
373 packet_id: 12.try_into().unwrap(),
374 id: Some(10.try_into().unwrap()),
375 user_properties: vec![("a".into(), "1".into())],
376 topic_filters: vec![("test".into(), SubscriptionOptions::default())],
377 };
378
379 let size = pkt.encoded_size(99999);
380 let mut buf = BytesMut::with_capacity(size);
381 pkt.encode(&mut buf, size as u32).unwrap();
382 assert_eq!(buf.len(), size);
383 assert_eq!(pkt, Subscribe::decode(&mut buf.freeze()).unwrap());
384
385 let pkt = Unsubscribe {
386 packet_id: 12.try_into().unwrap(),
387 user_properties: vec![("a".into(), "1".into())],
388 topic_filters: vec!["test".into()],
389 };
390
391 let size = pkt.encoded_size(99999);
392 let mut buf = BytesMut::with_capacity(size);
393 pkt.encode(&mut buf, size as u32).unwrap();
394 assert_eq!(buf.len(), size);
395 assert_eq!(pkt, Unsubscribe::decode(&mut buf.freeze()).unwrap());
396 }
397
398 #[test]
399 fn test_sub_pkt() {
400 let pkt = Packet::Subscribe(Subscribe {
401 packet_id: 12.try_into().unwrap(),
402 id: None,
403 user_properties: vec![("a".into(), "1".into())],
404 topic_filters: vec![("test".into(), SubscriptionOptions::default())],
405 });
406 let mut codec = Codec::default();
407
408 let mut buf = BytesMut::new();
409 codec.encode(pkt.clone(), &mut buf).unwrap();
410
411 assert_eq!(pkt, codec.decode(&mut buf).unwrap().unwrap().0);
412 }
413
414 #[test]
415 fn test_sub_ack() {
416 let ack = SubscribeAck {
417 packet_id: NonZeroU16::new(1).unwrap(),
418 properties: Vec::new(),
419 reason_string: Some("some reason".into()),
420 status: Vec::new(),
421 };
422
423 let size = ack.encoded_size(99999);
424 let mut buf = BytesMut::with_capacity(size);
425 ack.encode(&mut buf, size as u32).unwrap();
426 assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
427
428 let ack = SubscribeAck {
429 packet_id: NonZeroU16::new(1).unwrap(),
430 properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
431 reason_string: None,
432 status: vec![SubscribeAckReason::GrantedQos0],
433 };
434 let size = ack.encoded_size(99999);
435 let mut buf = BytesMut::with_capacity(size);
436 ack.encode(&mut buf, size as u32).unwrap();
437 assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
438
439 let ack = UnsubscribeAck {
440 packet_id: NonZeroU16::new(1).unwrap(),
441 properties: Vec::new(),
442 reason_string: Some("some reason".into()),
443 status: Vec::new(),
444 };
445 let mut buf = BytesMut::new();
446 let size = ack.encoded_size(99999);
447 ack.encode(&mut buf, size as u32).unwrap();
448 assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
449
450 let ack = UnsubscribeAck {
451 packet_id: NonZeroU16::new(1).unwrap(),
452 properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
453 reason_string: None,
454 status: vec![UnsubscribeAckReason::Success],
455 };
456 let size = ack.encoded_size(99999);
457 let mut buf = BytesMut::with_capacity(size);
458 ack.encode(&mut buf, size as u32).unwrap();
459 assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
460 }
461}