mqute_codec/protocol/v5/
unsubscribe.rs1use crate::Error;
8use crate::codec::util::decode_byte;
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::v5::property::{
11 Property, PropertyFrame, property_decode, property_encode, property_len,
12};
13use crate::protocol::v5::util::id_header;
14use crate::protocol::{FixedHeader, Flags, PacketType, QoS, TopicFilters, traits};
15use bytes::{Buf, Bytes, BytesMut};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct UnsubscribeProperties {
33 pub user_properties: Vec<(String, String)>,
35}
36
37impl PropertyFrame for UnsubscribeProperties {
38 fn encoded_len(&self) -> usize {
40 let mut len = 0usize;
41 len += property_len!(&self.user_properties);
42 len
43 }
44
45 fn encode(&self, buf: &mut BytesMut) {
47 property_encode!(&self.user_properties, Property::UserProp, buf);
48 }
49
50 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
52 where
53 Self: Sized,
54 {
55 if buf.is_empty() {
56 return Ok(None);
57 }
58
59 let mut user_properties: Vec<(String, String)> = Vec::new();
60
61 while buf.has_remaining() {
62 let property: Property = decode_byte(buf)?.try_into()?;
63 match property {
64 Property::UserProp => {
65 property_decode!(&mut user_properties, buf);
66 }
67 _ => return Err(Error::PropertyMismatch),
68 }
69 }
70
71 Ok(Some(UnsubscribeProperties { user_properties }))
72 }
73}
74
75id_header!(UnsubscribeHeader, UnsubscribeProperties);
77
78#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct Unsubscribe {
115 header: UnsubscribeHeader,
116 filters: TopicFilters,
117}
118
119impl Unsubscribe {
120 pub fn new<T: IntoIterator<Item: Into<String>>>(
126 packet_id: u16,
127 properties: Option<UnsubscribeProperties>,
128 filters: T,
129 ) -> Self {
130 let filters: Vec<String> = filters.into_iter().map(|x| x.into()).collect();
131
132 Unsubscribe {
133 header: UnsubscribeHeader::new(packet_id, properties),
134 filters: filters.into(),
135 }
136 }
137
138 pub fn packet_id(&self) -> u16 {
140 self.header.packet_id
141 }
142
143 pub fn properties(&self) -> Option<UnsubscribeProperties> {
145 self.header.properties.clone()
146 }
147
148 pub fn filters(&self) -> TopicFilters {
150 self.filters.clone()
151 }
152}
153
154impl Encode for Unsubscribe {
155 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
157 let header = FixedHeader::with_flags(
158 PacketType::Unsubscribe,
159 Flags::new(QoS::AtLeastOnce),
160 self.payload_len(),
161 );
162 header.encode(buf)?;
163
164 self.header.encode(buf)?;
165 self.filters.encode(buf);
166 Ok(())
167 }
168
169 fn payload_len(&self) -> usize {
171 self.header.encoded_len() + self.filters.encoded_len()
172 }
173}
174
175impl Decode for Unsubscribe {
176 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
178 if packet.header.packet_type() != PacketType::Unsubscribe
180 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
181 {
182 return Err(Error::MalformedPacket);
183 }
184
185 let header = UnsubscribeHeader::decode(&mut packet.payload)?;
186 let filters = TopicFilters::decode(&mut packet.payload)?;
187
188 Ok(Unsubscribe::new(
189 header.packet_id,
190 header.properties,
191 filters,
192 ))
193 }
194}
195
196impl traits::Unsubscribe for Unsubscribe {}