1use crate::Error;
8use crate::codec::util::{
9 decode_byte, decode_string, decode_variable_integer, encode_string, encode_variable_integer,
10};
11use crate::codec::{Decode, Encode, RawPacket};
12use crate::protocol::util::len_bytes;
13use crate::protocol::v5::property::{
14 Property, PropertyFrame, property_decode, property_encode, property_len,
15};
16use crate::protocol::v5::util::id_header;
17use crate::protocol::{FixedHeader, Flags, PacketType, QoS, util};
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use std::borrow::Borrow;
20use std::ops::{Index, IndexMut};
21
22#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct SubscribeProperties {
40 pub subscription_id: Option<u32>,
42 pub user_properties: Vec<(String, String)>,
44}
45
46impl PropertyFrame for SubscribeProperties {
47 fn encoded_len(&self) -> usize {
49 let mut len = 0usize;
50
51 if let Some(value) = self.subscription_id {
52 len += 1 + len_bytes(value as usize);
53 }
54 len += property_len!(&self.user_properties);
55
56 len
57 }
58
59 fn encode(&self, buf: &mut BytesMut) {
61 if let Some(value) = self.subscription_id {
62 buf.put_u8(Property::SubscriptionIdentifier.into());
63 encode_variable_integer(buf, value).expect("");
64 }
65
66 property_encode!(&self.user_properties, Property::UserProp, buf);
67 }
68
69 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
71 where
72 Self: Sized,
73 {
74 if buf.is_empty() {
75 return Ok(None);
76 }
77
78 let mut subscription_id: Option<u32> = None;
79 let mut user_properties: Vec<(String, String)> = Vec::new();
80
81 while buf.has_remaining() {
82 let property: Property = decode_byte(buf)?.try_into()?;
83 match property {
84 Property::SubscriptionIdentifier => {
85 if subscription_id.is_some() {
86 return Err(Error::ProtocolError);
87 }
88 subscription_id = Some(decode_variable_integer(buf)? as u32);
89 }
90 Property::UserProp => {
91 property_decode!(&mut user_properties, buf);
92 }
93 _ => return Err(Error::PropertyMismatch),
94 }
95 }
96
97 Ok(Some(SubscribeProperties {
98 subscription_id,
99 user_properties,
100 }))
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
106pub enum RetainHandling {
107 Send = 0,
109 SendForNewSub = 1,
111 DoNotSend = 2,
113}
114
115impl TryFrom<u8> for RetainHandling {
116 type Error = Error;
117
118 fn try_from(value: u8) -> Result<Self, Self::Error> {
119 match value {
120 0 => Ok(RetainHandling::Send),
121 1 => Ok(RetainHandling::SendForNewSub),
122 2 => Ok(RetainHandling::DoNotSend),
123 n => Err(Error::InvalidRetainHandling(n)),
124 }
125 }
126}
127
128impl From<RetainHandling> for u8 {
129 fn from(value: RetainHandling) -> Self {
130 value as u8
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct TopicOptionFilter {
146 pub topic: String,
148 pub qos: QoS,
150 pub no_local: bool,
152 pub retain_as_published: bool,
154 pub retain_handling: RetainHandling,
156}
157
158impl TopicOptionFilter {
159 pub fn new<S: Into<String>>(
165 topic: S,
166 qos: QoS,
167 no_local: bool,
168 retain_as_published: bool,
169 retain_handling: RetainHandling,
170 ) -> Self {
171 let topic = topic.into();
172
173 if !util::is_valid_topic_filter(&topic) {
174 panic!("Invalid topic filter: '{}'", topic);
175 }
176
177 TopicOptionFilter {
178 topic,
179 qos,
180 no_local,
181 retain_as_published,
182 retain_handling,
183 }
184 }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct TopicOptionFilters(Vec<TopicOptionFilter>);
204
205#[allow(clippy::len_without_is_empty)]
206impl TopicOptionFilters {
207 pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(filters: T) -> Self {
215 let values: Vec<TopicOptionFilter> = filters.into_iter().collect();
216
217 if values.is_empty() {
218 panic!("At least one topic filter is required");
219 }
220
221 TopicOptionFilters(values)
222 }
223
224 pub fn len(&self) -> usize {
226 self.0.len()
227 }
228
229 pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
231 let mut filters = Vec::with_capacity(1);
232
233 while payload.has_remaining() {
234 let topic = decode_string(payload)?;
235
236 if !util::is_valid_topic_filter(&topic) {
237 return Err(Error::InvalidTopicFilter(topic));
238 }
239
240 let flags = decode_byte(payload)?;
241
242 if flags & 0b1100_0000 > 0 {
244 return Err(Error::MalformedPacket);
245 }
246
247 let qos = (flags & 0x03).try_into()?;
248 let no_local = flags & 0x04 != 0;
249 let retain_as_published = flags & 0x08 != 0;
250 let retain_handling = ((flags >> 4) & 0x03).try_into()?;
251
252 filters.push(TopicOptionFilter::new(
253 topic,
254 qos,
255 no_local,
256 retain_as_published,
257 retain_handling,
258 ));
259 }
260
261 if filters.is_empty() {
262 return Err(Error::NoTopic);
263 }
264
265 Ok(TopicOptionFilters(filters))
266 }
267
268 pub(crate) fn encode(&self, buf: &mut BytesMut) {
270 self.0.iter().for_each(|f| {
271 let qos: u8 = f.qos.into();
272 let retain_handling: u8 = f.retain_handling.into();
273
274 let options: u8 = retain_handling << 4
275 | (f.retain_as_published as u8) << 3
276 | (f.no_local as u8) << 2
277 | qos;
278
279 encode_string(buf, &f.topic);
280 buf.put_u8(options);
281 });
282 }
283
284 pub(crate) fn encoded_len(&self) -> usize {
285 self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
286 }
287}
288
289impl AsRef<Vec<TopicOptionFilter>> for TopicOptionFilters {
291 #[inline]
292 fn as_ref(&self) -> &Vec<TopicOptionFilter> {
293 &self.0
294 }
295}
296
297impl Borrow<Vec<TopicOptionFilter>> for TopicOptionFilters {
298 fn borrow(&self) -> &Vec<TopicOptionFilter> {
299 &self.0
300 }
301}
302
303impl IntoIterator for TopicOptionFilters {
304 type Item = TopicOptionFilter;
305 type IntoIter = std::vec::IntoIter<TopicOptionFilter>;
306
307 fn into_iter(self) -> Self::IntoIter {
308 self.0.into_iter()
309 }
310}
311
312impl FromIterator<TopicOptionFilter> for TopicOptionFilters {
313 fn from_iter<T: IntoIterator<Item = TopicOptionFilter>>(iter: T) -> Self {
314 TopicOptionFilters(Vec::from_iter(iter))
315 }
316}
317
318impl From<TopicOptionFilters> for Vec<TopicOptionFilter> {
319 #[inline]
320 fn from(value: TopicOptionFilters) -> Self {
321 value.0
322 }
323}
324
325impl From<Vec<TopicOptionFilter>> for TopicOptionFilters {
326 #[inline]
327 fn from(value: Vec<TopicOptionFilter>) -> Self {
328 TopicOptionFilters(value)
329 }
330}
331
332impl Index<usize> for TopicOptionFilters {
333 type Output = TopicOptionFilter;
334
335 fn index(&self, index: usize) -> &Self::Output {
336 self.0.index(index)
337 }
338}
339
340impl IndexMut<usize> for TopicOptionFilters {
341 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
342 self.0.index_mut(index)
343 }
344}
345
346id_header!(SubscribeHeader, SubscribeProperties);
348
349#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct Subscribe {
395 header: SubscribeHeader,
396 filters: TopicOptionFilters,
397}
398
399impl Subscribe {
400 pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(
402 packet_id: u16,
403 properties: Option<SubscribeProperties>,
404 filters: T,
405 ) -> Self {
406 let header = SubscribeHeader::new(packet_id, properties);
407 let filters = TopicOptionFilters::new(filters);
408
409 Subscribe { header, filters }
410 }
411
412 pub fn packet_id(&self) -> u16 {
414 self.header.packet_id
415 }
416
417 pub fn properties(&self) -> Option<SubscribeProperties> {
419 self.header.properties.clone()
420 }
421
422 pub fn filters(&self) -> TopicOptionFilters {
424 self.filters.clone()
425 }
426}
427
428impl Encode for Subscribe {
429 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
431 let header = FixedHeader::with_flags(
432 PacketType::Subscribe,
433 Flags::new(QoS::AtLeastOnce),
434 self.payload_len(),
435 );
436 header.encode(buf)?;
437
438 self.header.encode(buf)?;
439 self.filters.encode(buf);
440
441 Ok(())
442 }
443
444 fn payload_len(&self) -> usize {
446 self.header.encoded_len() + self.filters.encoded_len()
447 }
448}
449
450impl Decode for Subscribe {
451 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
453 if packet.header.packet_type() != PacketType::Subscribe
455 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
456 {
457 return Err(Error::MalformedPacket);
458 }
459
460 let header = SubscribeHeader::decode(&mut packet.payload)?;
461 let filters = TopicOptionFilters::decode(&mut packet.payload)?;
462
463 Ok(Subscribe::new(header.packet_id, header.properties, filters))
464 }
465}