1use crate::codec::util::{
8 decode_byte, decode_string, decode_variable_integer, encode_string, encode_variable_integer,
9};
10use crate::codec::{Decode, Encode, RawPacket};
11use crate::protocol::util::len_bytes;
12use crate::protocol::v5::property::{
13 property_decode, property_encode, property_len, Property, PropertyFrame,
14};
15use crate::protocol::v5::util::id_header;
16use crate::protocol::{FixedHeader, Flags, PacketType, QoS};
17use crate::Error;
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use std::borrow::Borrow;
20use std::ops::{Index, IndexMut};
21
22#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct SubscribeProperties {
40 pub subscription_id: Option<u32>,
42 pub user_properties: Vec<(String, String)>,
44}
45
46impl PropertyFrame for SubscribeProperties {
47 fn encoded_len(&self) -> usize {
49 let mut len = 0usize;
50
51 if let Some(value) = self.subscription_id {
52 len += 1 + len_bytes(value as usize);
53 }
54 len += property_len!(&self.user_properties);
55
56 len
57 }
58
59 fn encode(&self, buf: &mut BytesMut) {
61 if let Some(value) = self.subscription_id {
62 buf.put_u8(Property::SubscriptionIdentifier.into());
63 encode_variable_integer(buf, value).expect("");
64 }
65
66 property_encode!(&self.user_properties, Property::UserProp, buf);
67 }
68
69 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
71 where
72 Self: Sized,
73 {
74 if buf.is_empty() {
75 return Ok(None);
76 }
77
78 let mut subscription_id: Option<u32> = None;
79 let mut user_properties: Vec<(String, String)> = Vec::new();
80
81 while buf.has_remaining() {
82 let property: Property = decode_byte(buf)?.try_into()?;
83 match property {
84 Property::SubscriptionIdentifier => {
85 if subscription_id.is_some() {
86 return Err(Error::ProtocolError);
87 }
88 subscription_id = Some(decode_variable_integer(buf)? as u32);
89 }
90 Property::UserProp => {
91 property_decode!(&mut user_properties, buf);
92 }
93 _ => return Err(Error::PropertyMismatch),
94 }
95 }
96
97 Ok(Some(SubscribeProperties {
98 subscription_id,
99 user_properties,
100 }))
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
106pub enum RetainHandling {
107 Send = 0,
109 SendForNewSub = 1,
111 DoNotSend = 2,
113}
114
115impl TryFrom<u8> for RetainHandling {
116 type Error = Error;
117
118 fn try_from(value: u8) -> Result<Self, Self::Error> {
119 match value {
120 0 => Ok(RetainHandling::Send),
121 1 => Ok(RetainHandling::SendForNewSub),
122 2 => Ok(RetainHandling::DoNotSend),
123 n => Err(Error::InvalidRetainHandling(n)),
124 }
125 }
126}
127
128impl From<RetainHandling> for u8 {
129 fn from(value: RetainHandling) -> Self {
130 value as u8
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct TopicOptionFilter {
146 pub topic: String,
148 pub qos: QoS,
150 pub no_local: bool,
152 pub retain_as_published: bool,
154 pub retain_handling: RetainHandling,
156}
157
158impl TopicOptionFilter {
159 pub fn new<S: Into<String>>(
161 topic: S,
162 qos: QoS,
163 no_local: bool,
164 retain_as_published: bool,
165 retain_handling: RetainHandling,
166 ) -> Self {
167 TopicOptionFilter {
168 topic: topic.into(),
169 qos,
170 no_local,
171 retain_as_published,
172 retain_handling,
173 }
174 }
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct TopicOptionFilters(Vec<TopicOptionFilter>);
194
195#[allow(clippy::len_without_is_empty)]
196impl TopicOptionFilters {
197 pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(filters: T) -> Self {
202 let values: Vec<TopicOptionFilter> = filters.into_iter().collect();
203
204 if values.is_empty() {
205 panic!("At least one topic filter is required");
206 }
207
208 TopicOptionFilters(values)
209 }
210
211 pub fn len(&self) -> usize {
213 self.0.len()
214 }
215
216 pub(crate) fn decode(payload: &mut Bytes) -> Result<Self, Error> {
218 let mut filters = Vec::with_capacity(1);
219
220 while payload.has_remaining() {
221 let topic = decode_string(payload)?;
222 let flags = decode_byte(payload)?;
223
224 if flags & 0b1100_0000 > 0 {
226 return Err(Error::MalformedPacket);
227 }
228
229 let qos = (flags & 0x03).try_into()?;
230 let no_local = flags & 0x04 != 0;
231 let retain_as_published = flags & 0x08 != 0;
232 let retain_handling = ((flags >> 4) & 0x03).try_into()?;
233
234 filters.push(TopicOptionFilter::new(
235 topic,
236 qos,
237 no_local,
238 retain_as_published,
239 retain_handling,
240 ));
241 }
242
243 if filters.is_empty() {
244 return Err(Error::NoTopic);
245 }
246
247 Ok(TopicOptionFilters(filters))
248 }
249
250 pub(crate) fn encode(&self, buf: &mut BytesMut) {
252 self.0.iter().for_each(|f| {
253 let qos: u8 = f.qos.into();
254 let retain_handling: u8 = f.retain_handling.into();
255
256 let options: u8 = retain_handling << 4
257 | (f.retain_as_published as u8) << 3
258 | (f.no_local as u8) << 2
259 | qos;
260
261 encode_string(buf, &f.topic);
262 buf.put_u8(options);
263 });
264 }
265
266 pub(crate) fn encoded_len(&self) -> usize {
267 self.0.iter().fold(0, |acc, f| acc + 2 + f.topic.len() + 1)
268 }
269}
270
271impl AsRef<Vec<TopicOptionFilter>> for TopicOptionFilters {
273 #[inline]
274 fn as_ref(&self) -> &Vec<TopicOptionFilter> {
275 &self.0
276 }
277}
278
279impl Borrow<Vec<TopicOptionFilter>> for TopicOptionFilters {
280 fn borrow(&self) -> &Vec<TopicOptionFilter> {
281 &self.0
282 }
283}
284
285impl IntoIterator for TopicOptionFilters {
286 type Item = TopicOptionFilter;
287 type IntoIter = std::vec::IntoIter<TopicOptionFilter>;
288
289 fn into_iter(self) -> Self::IntoIter {
290 self.0.into_iter()
291 }
292}
293
294impl FromIterator<TopicOptionFilter> for TopicOptionFilters {
295 fn from_iter<T: IntoIterator<Item = TopicOptionFilter>>(iter: T) -> Self {
296 TopicOptionFilters(Vec::from_iter(iter))
297 }
298}
299
300impl From<TopicOptionFilters> for Vec<TopicOptionFilter> {
301 #[inline]
302 fn from(value: TopicOptionFilters) -> Self {
303 value.0
304 }
305}
306
307impl From<Vec<TopicOptionFilter>> for TopicOptionFilters {
308 #[inline]
309 fn from(value: Vec<TopicOptionFilter>) -> Self {
310 TopicOptionFilters(value)
311 }
312}
313
314impl Index<usize> for TopicOptionFilters {
315 type Output = TopicOptionFilter;
316
317 fn index(&self, index: usize) -> &Self::Output {
318 self.0.index(index)
319 }
320}
321
322impl IndexMut<usize> for TopicOptionFilters {
323 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
324 self.0.index_mut(index)
325 }
326}
327
328id_header!(SubscribeHeader, SubscribeProperties);
330
331#[derive(Debug, Clone, PartialEq, Eq)]
376pub struct Subscribe {
377 header: SubscribeHeader,
378 filters: TopicOptionFilters,
379}
380
381impl Subscribe {
382 pub fn new<T: IntoIterator<Item = TopicOptionFilter>>(
384 packet_id: u16,
385 properties: Option<SubscribeProperties>,
386 filters: T,
387 ) -> Self {
388 let header = SubscribeHeader::new(packet_id, properties);
389 let filters = TopicOptionFilters::new(filters);
390
391 Subscribe { header, filters }
392 }
393
394 pub fn packet_id(&self) -> u16 {
396 self.header.packet_id
397 }
398
399 pub fn properties(&self) -> Option<SubscribeProperties> {
401 self.header.properties.clone()
402 }
403
404 pub fn filters(&self) -> TopicOptionFilters {
406 self.filters.clone()
407 }
408}
409
410impl Encode for Subscribe {
411 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
413 let header = FixedHeader::with_flags(
414 PacketType::Subscribe,
415 Flags::new(QoS::AtLeastOnce),
416 self.payload_len(),
417 );
418 header.encode(buf)?;
419
420 self.header.encode(buf)?;
421 self.filters.encode(buf);
422
423 Ok(())
424 }
425
426 fn payload_len(&self) -> usize {
428 self.header.encoded_len() + self.filters.encoded_len()
429 }
430}
431
432impl Decode for Subscribe {
433 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
435 if packet.header.packet_type() != PacketType::Subscribe
437 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
438 {
439 return Err(Error::MalformedPacket);
440 }
441
442 let header = SubscribeHeader::decode(&mut packet.payload)?;
443 let filters = TopicOptionFilters::decode(&mut packet.payload)?;
444
445 Ok(Subscribe::new(header.packet_id, header.properties, filters))
446 }
447}