mqute_codec/protocol/v5/
unsubscribe.rs1use crate::Error;
8use crate::codec::util::decode_byte;
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::v5::property::{
11 Property, PropertyFrame, property_decode, property_encode, property_len,
12};
13use crate::protocol::v5::util::id_header;
14use crate::protocol::{FixedHeader, Flags, PacketType, QoS, TopicFilters};
15use bytes::{Buf, Bytes, BytesMut};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct UnsubscribeProperties {
33 pub user_properties: Vec<(String, String)>,
35}
36
37impl PropertyFrame for UnsubscribeProperties {
38 fn encoded_len(&self) -> usize {
40 let mut len = 0usize;
41 len += property_len!(&self.user_properties);
42 len
43 }
44
45 fn encode(&self, buf: &mut BytesMut) {
47 property_encode!(&self.user_properties, Property::UserProp, buf);
48 }
49
50 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
52 where
53 Self: Sized,
54 {
55 if buf.is_empty() {
56 return Ok(None);
57 }
58
59 let mut user_properties: Vec<(String, String)> = Vec::new();
60
61 while buf.has_remaining() {
62 let property: Property = decode_byte(buf)?.try_into()?;
63 match property {
64 Property::UserProp => {
65 property_decode!(&mut user_properties, buf);
66 }
67 _ => return Err(Error::PropertyMismatch),
68 }
69 }
70
71 Ok(Some(UnsubscribeProperties { user_properties }))
72 }
73}
74
75id_header!(UnsubscribeHeader, UnsubscribeProperties);
77
78#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct Unsubscribe {
115 header: UnsubscribeHeader,
116 filters: TopicFilters,
117}
118
119impl Unsubscribe {
120 pub fn new<T: IntoIterator<Item: Into<String>>>(
122 packet_id: u16,
123 properties: Option<UnsubscribeProperties>,
124 filters: T,
125 ) -> Self {
126 let filters: Vec<String> = filters.into_iter().map(|x| x.into()).collect();
127
128 Unsubscribe {
129 header: UnsubscribeHeader::new(packet_id, properties),
130 filters: filters.into(),
131 }
132 }
133
134 pub fn packet_id(&self) -> u16 {
136 self.header.packet_id
137 }
138
139 pub fn properties(&self) -> Option<UnsubscribeProperties> {
141 self.header.properties.clone()
142 }
143
144 pub fn filters(&self) -> TopicFilters {
146 self.filters.clone()
147 }
148}
149
150impl Encode for Unsubscribe {
151 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
153 let header = FixedHeader::with_flags(
154 PacketType::Unsubscribe,
155 Flags::new(QoS::AtLeastOnce),
156 self.payload_len(),
157 );
158 header.encode(buf)?;
159
160 self.header.encode(buf)?;
161 self.filters.encode(buf);
162 Ok(())
163 }
164
165 fn payload_len(&self) -> usize {
167 self.header.encoded_len() + self.filters.encoded_len()
168 }
169}
170
171impl Decode for Unsubscribe {
172 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
174 if packet.header.packet_type() != PacketType::Unsubscribe
176 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
177 {
178 return Err(Error::MalformedPacket);
179 }
180
181 let header = UnsubscribeHeader::decode(&mut packet.payload)?;
182 let filters = TopicFilters::decode(&mut packet.payload)?;
183
184 Ok(Unsubscribe::new(
185 header.packet_id,
186 header.properties,
187 filters,
188 ))
189 }
190}