1use std::num::{NonZeroU16, NonZeroU32};
2
3use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
4
5use super::ack_props;
6use crate::error::{DecodeError, EncodeError};
7use crate::types::QoS;
8use crate::utils::{self, Decode, Encode, write_variable_length};
9use crate::v5::codec::{UserProperties, UserProperty, encode::*, property_type as pt};
10
11#[derive(Debug, PartialEq, Eq, Clone)]
13pub struct Subscribe {
14 pub packet_id: NonZeroU16,
16 pub id: Option<NonZeroU32>,
18 pub user_properties: UserProperties,
19 pub topic_filters: Vec<(ByteString, SubscriptionOptions)>,
21}
22
23#[derive(Debug, PartialEq, Eq, Copy, Clone)]
24pub struct SubscriptionOptions {
25 pub qos: QoS,
26 pub no_local: bool,
27 pub retain_as_published: bool,
28 pub retain_handling: RetainHandling,
29}
30
31impl Default for SubscriptionOptions {
32 fn default() -> Self {
33 Self {
34 qos: QoS::AtMostOnce,
35 no_local: false,
36 retain_as_published: false,
37 retain_handling: RetainHandling::AtSubscribe,
38 }
39 }
40}
41
42prim_enum! {
43 pub enum RetainHandling {
44 AtSubscribe = 0,
45 AtSubscribeNew = 1,
46 NoAtSubscribe = 2
47 }
48}
49
50#[derive(Debug, PartialEq, Eq, Clone)]
52pub struct SubscribeAck {
53 pub packet_id: NonZeroU16,
54 pub properties: UserProperties,
55 pub reason_string: Option<ByteString>,
56 pub status: Vec<SubscribeAckReason>,
58}
59
60#[derive(Debug, PartialEq, Eq, Clone)]
62pub struct Unsubscribe {
63 pub packet_id: NonZeroU16,
65 pub user_properties: UserProperties,
66 pub topic_filters: Vec<ByteString>,
68}
69
70#[derive(Debug, PartialEq, Eq, Clone)]
72pub struct UnsubscribeAck {
73 pub packet_id: NonZeroU16,
75 pub properties: UserProperties,
76 pub reason_string: Option<ByteString>,
77 pub status: Vec<UnsubscribeAckReason>,
78}
79
80prim_enum! {
81 pub enum SubscribeAckReason {
83 GrantedQos0 = 0,
84 GrantedQos1 = 1,
85 GrantedQos2 = 2,
86 UnspecifiedError = 128,
87 ImplementationSpecificError = 131,
88 NotAuthorized = 135,
89 TopicFilterInvalid = 143,
90 PacketIdentifierInUse = 145,
91 QuotaExceeded = 151,
92 SharedSubscriptionNotSupported = 158,
93 SubscriptionIdentifiersNotSupported = 161,
94 WildcardSubscriptionsNotSupported = 162
95 }
96}
97
98prim_enum! {
99 pub enum UnsubscribeAckReason {
101 Success = 0,
102 NoSubscriptionExisted = 17,
103 UnspecifiedError = 128,
104 ImplementationSpecificError = 131,
105 NotAuthorized = 135,
106 TopicFilterInvalid = 143,
107 PacketIdentifierInUse = 145
108 }
109}
110
111impl Subscribe {
112 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
113 let packet_id = NonZeroU16::decode(src)?;
114 let prop_src = &mut utils::take_properties(src)?;
115 let mut sub_id = None;
116 let mut user_properties = Vec::new();
117 while prop_src.has_remaining() {
118 let prop_id = prop_src.get_u8();
119 match prop_id {
120 pt::SUB_ID => {
121 ensure!(sub_id.is_none(), DecodeError::MalformedPacket); let val = utils::decode_variable_length_cursor(prop_src)?;
123 sub_id = Some(NonZeroU32::new(val).ok_or(DecodeError::MalformedPacket)?);
124 }
125 pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
126 _ => return Err(DecodeError::MalformedPacket),
127 }
128 }
129
130 let mut topic_filters = Vec::new();
131 while src.has_remaining() {
132 let topic = ByteString::decode(src)?;
133 let opts = SubscriptionOptions::decode(src)?;
134 topic_filters.push((topic, opts));
135 }
136
137 Ok(Self { packet_id, id: sub_id, user_properties, topic_filters })
138 }
139}
140
141impl SubscribeAck {
142 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
143 let packet_id = NonZeroU16::decode(src)?;
144 let (properties, reason_string) = ack_props::decode(src)?;
145 let mut status = Vec::with_capacity(src.remaining());
146 for code in src.as_ref().iter().copied() {
147 status.push(code.try_into()?);
148 }
149 Ok(Self { packet_id, properties, reason_string, status })
150 }
151}
152
153impl Unsubscribe {
154 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
155 let packet_id = NonZeroU16::decode(src)?;
156
157 let prop_src = &mut utils::take_properties(src)?;
158 let mut user_properties = Vec::new();
159 while prop_src.has_remaining() {
160 let prop_id = prop_src.get_u8();
161 match prop_id {
162 pt::USER => user_properties.push(UserProperty::decode(prop_src)?),
163 _ => return Err(DecodeError::MalformedPacket),
164 }
165 }
166
167 let mut topic_filters = Vec::new();
168 while src.remaining() > 0 {
169 topic_filters.push(ByteString::decode(src)?);
170 }
171
172 Ok(Self { packet_id, user_properties, topic_filters })
173 }
174}
175
176impl UnsubscribeAck {
177 pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
178 let packet_id = NonZeroU16::decode(src)?;
179 let (properties, reason_string) = ack_props::decode(src)?;
180 let mut status = Vec::with_capacity(src.remaining());
181 for code in src.as_ref().iter().copied() {
182 status.push(code.try_into()?);
183 }
184 Ok(Self { packet_id, properties, reason_string, status })
185 }
186}
187
188impl EncodeLtd for Subscribe {
189 fn encoded_size(&self, _limit: u32) -> usize {
190 let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize) as usize) + self.user_properties.encoded_size();
192 let payload_len = self
193 .topic_filters
194 .iter()
195 .fold(0, |acc, (filter, _opts)| acc + filter.encoded_size() + 1);
196 self.packet_id.encoded_size() + var_int_len(prop_len) as usize + prop_len + payload_len
197 }
198
199 fn encode(&self, buf: &mut BytesMut, _: u32) -> Result<(), EncodeError> {
200 self.packet_id.encode(buf)?;
201
202 let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize))
204 + self.user_properties.encoded_size() as u32; utils::write_variable_length(prop_len, buf);
206
207 if let Some(id) = self.id {
208 buf.put_u8(pt::SUB_ID);
209 write_variable_length(id.get(), buf);
210 }
211
212 self.user_properties.encode(buf)?;
213
214 for (filter, opts) in self.topic_filters.iter() {
216 filter.encode(buf)?;
217 opts.encode(buf)?;
218 }
219
220 Ok(())
221 }
222}
223
224impl Decode for SubscriptionOptions {
225 fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
226 ensure!(src.has_remaining(), DecodeError::InvalidLength);
227 let val = src.get_u8();
228 let qos = (val & 0b0000_0011).try_into()?;
229 let retain_handling = ((val & 0b0011_0000) >> 4).try_into()?;
230 Ok(SubscriptionOptions {
231 qos,
232 no_local: val & 0b0000_0100 != 0,
233 retain_as_published: val & 0b0000_1000 != 0,
234 retain_handling,
235 })
236 }
237}
238
239impl Encode for SubscriptionOptions {
240 fn encoded_size(&self) -> usize {
241 1
242 }
243 fn encode(&self, buf: &mut BytesMut) -> Result<(), EncodeError> {
244 buf.put_u8(
245 u8::from(self.qos)
246 | ((self.no_local as u8) << 2)
247 | ((self.retain_as_published as u8) << 3)
248 | (u8::from(self.retain_handling) << 4),
249 );
250 Ok(())
251 }
252}
253
254impl EncodeLtd for SubscribeAck {
255 fn encoded_size(&self, limit: u32) -> usize {
256 let len = self.status.len();
257 if len > (u32::MAX - 2) as usize {
258 return usize::MAX; }
260
261 2 + ack_props::encoded_size(
262 &self.properties,
263 &self.reason_string,
264 limit - 2 - len as u32,
265 ) + len
266 }
267
268 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
269 self.packet_id.encode(buf)?;
270 let len = self.status.len() as u32; ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
272 for &reason in self.status.iter() {
273 buf.put_u8(reason.into());
274 }
275 Ok(())
276 }
277}
278
279impl EncodeLtd for Unsubscribe {
280 fn encoded_size(&self, _limit: u32) -> usize {
281 let prop_len = self.user_properties.encoded_size();
282 2 + var_int_len(prop_len) as usize
283 + prop_len
284 + self.topic_filters.iter().fold(0, |acc, filter| acc + 2 + filter.len())
285 }
286
287 fn encode(&self, buf: &mut BytesMut, _size: u32) -> Result<(), EncodeError> {
288 self.packet_id.encode(buf)?;
289
290 let prop_len = self.user_properties.encoded_size();
292 utils::write_variable_length(prop_len as u32, buf); self.user_properties.encode(buf)?;
294
295 for filter in self.topic_filters.iter() {
297 filter.encode(buf)?;
298 }
299 Ok(())
300 }
301}
302
303impl EncodeLtd for UnsubscribeAck {
304 fn encoded_size(&self, limit: u32) -> usize {
306 let len = self.status.len();
307 2 + len
308 + ack_props::encoded_size(
309 &self.properties,
310 &self.reason_string,
311 reduce_limit(limit, 2 + len),
312 )
313 }
314
315 fn encode(&self, buf: &mut BytesMut, size: u32) -> Result<(), EncodeError> {
316 self.packet_id.encode(buf)?;
317 let len = self.status.len() as u32;
318
319 ack_props::encode(&self.properties, &self.reason_string, buf, size - 2 - len)?;
320 for &reason in self.status.iter() {
321 buf.put_u8(reason.into());
322 }
323 Ok(())
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use ntex_codec::{Decoder, Encoder};
330
331 use super::super::super::{Codec, Decoded, Packet};
332 use super::*;
333
334 fn packet(res: Decoded) -> Packet {
335 match res {
336 Decoded::Packet(pkt, _) => pkt,
337 _ => panic!(),
338 }
339 }
340
341 #[test]
342 fn test_sub() {
343 let pkt = Subscribe {
344 packet_id: 12.try_into().unwrap(),
345 id: Some(10.try_into().unwrap()),
346 user_properties: vec![("a".into(), "1".into())],
347 topic_filters: vec![("test".into(), SubscriptionOptions::default())],
348 };
349
350 let size = pkt.encoded_size(99999);
351 let mut buf = BytesMut::with_capacity(size);
352 pkt.encode(&mut buf, size as u32).unwrap();
353 assert_eq!(buf.len(), size);
354 assert_eq!(pkt, Subscribe::decode(&mut buf.freeze()).unwrap());
355
356 let pkt = Unsubscribe {
357 packet_id: 12.try_into().unwrap(),
358 user_properties: vec![("a".into(), "1".into())],
359 topic_filters: vec!["test".into()],
360 };
361
362 let size = pkt.encoded_size(99999);
363 let mut buf = BytesMut::with_capacity(size);
364 pkt.encode(&mut buf, size as u32).unwrap();
365 assert_eq!(buf.len(), size);
366 assert_eq!(pkt, Unsubscribe::decode(&mut buf.freeze()).unwrap());
367 }
368
369 #[test]
370 fn test_sub_pkt() {
371 let pkt = Packet::Subscribe(Subscribe {
372 packet_id: 12.try_into().unwrap(),
373 id: None,
374 user_properties: vec![("a".into(), "1".into())],
375 topic_filters: vec![("test".into(), SubscriptionOptions::default())],
376 });
377 let codec = Codec::new();
378
379 let mut buf = BytesMut::new();
380 codec.encode(pkt.clone().into(), &mut buf).unwrap();
381
382 assert_eq!(pkt, packet(codec.decode(&mut buf).unwrap().unwrap()));
383 }
384
385 #[test]
386 fn test_sub_ack() {
387 let ack = SubscribeAck {
388 packet_id: NonZeroU16::new(1).unwrap(),
389 properties: Vec::new(),
390 reason_string: Some("some reason".into()),
391 status: Vec::new(),
392 };
393
394 let size = ack.encoded_size(99999);
395 let mut buf = BytesMut::with_capacity(size);
396 ack.encode(&mut buf, size as u32).unwrap();
397 assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
398
399 let ack = SubscribeAck {
400 packet_id: NonZeroU16::new(1).unwrap(),
401 properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
402 reason_string: None,
403 status: vec![SubscribeAckReason::GrantedQos0],
404 };
405 let size = ack.encoded_size(99999);
406 let mut buf = BytesMut::with_capacity(size);
407 ack.encode(&mut buf, size as u32).unwrap();
408 assert_eq!(ack, SubscribeAck::decode(&mut buf.freeze()).unwrap());
409
410 let ack = UnsubscribeAck {
411 packet_id: NonZeroU16::new(1).unwrap(),
412 properties: Vec::new(),
413 reason_string: Some("some reason".into()),
414 status: Vec::new(),
415 };
416 let mut buf = BytesMut::new();
417 let size = ack.encoded_size(99999);
418 ack.encode(&mut buf, size as u32).unwrap();
419 assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
420
421 let ack = UnsubscribeAck {
422 packet_id: NonZeroU16::new(1).unwrap(),
423 properties: vec![("prop1".into(), "val1".into()), ("prop2".into(), "val2".into())],
424 reason_string: None,
425 status: vec![UnsubscribeAckReason::Success],
426 };
427 let size = ack.encoded_size(99999);
428 let mut buf = BytesMut::with_capacity(size);
429 ack.encode(&mut buf, size as u32).unwrap();
430 assert_eq!(ack, UnsubscribeAck::decode(&mut buf.freeze()).unwrap());
431 }
432}