mqute_codec/protocol/v5/unsubscribe.rs
1//! # Unsubscribe Packet - MQTT v5
2//!
3//! This module implements the MQTT v5 `Unsubscribe` packet, which is sent by clients to
4//! request unsubscription from one or more topics. The packet includes the list of
5//! topic filters to unsubscribe from and optional properties.
6
7use crate::Error;
8use crate::codec::util::decode_byte;
9use crate::codec::{Decode, Encode, RawPacket};
10use crate::protocol::v5::property::{
11 Property, PropertyFrame, property_decode, property_encode, property_len,
12};
13use crate::protocol::v5::util::id_header;
14use crate::protocol::{FixedHeader, Flags, PacketType, QoS, TopicFilters};
15use bytes::{Buf, Bytes, BytesMut};
16
17/// Properties specific to `Unsubscribe` packets
18///
19/// In MQTT v5, `Unsubscribe` packets can include:
20/// - User Properties (key-value pairs for extended metadata)
21///
22/// # Example
23///
24/// ```rust
25/// use mqute_codec::protocol::v5::UnsubscribeProperties;
26///
27/// let properties = UnsubscribeProperties {
28/// user_properties: vec![("client".into(), "rust".into())],
29/// };
30/// ```
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct UnsubscribeProperties {
33 /// User-defined key-value properties
34 pub user_properties: Vec<(String, String)>,
35}
36
37impl PropertyFrame for UnsubscribeProperties {
38 /// Calculates the encoded length of the properties
39 fn encoded_len(&self) -> usize {
40 let mut len = 0usize;
41 len += property_len!(&self.user_properties);
42 len
43 }
44
45 /// Encodes the properties into a byte buffer
46 fn encode(&self, buf: &mut BytesMut) {
47 property_encode!(&self.user_properties, Property::UserProp, buf);
48 }
49
50 /// Decodes properties from a byte buffer
51 fn decode(buf: &mut Bytes) -> Result<Option<Self>, Error>
52 where
53 Self: Sized,
54 {
55 if buf.is_empty() {
56 return Ok(None);
57 }
58
59 let mut user_properties: Vec<(String, String)> = Vec::new();
60
61 while buf.has_remaining() {
62 let property: Property = decode_byte(buf)?.try_into()?;
63 match property {
64 Property::UserProp => {
65 property_decode!(&mut user_properties, buf);
66 }
67 _ => return Err(Error::PropertyMismatch),
68 }
69 }
70
71 Ok(Some(UnsubscribeProperties { user_properties }))
72 }
73}
74
75// Internal header structure for `Unsubscribe` packets
76id_header!(UnsubscribeHeader, UnsubscribeProperties);
77
78/// Represents an MQTT v5 `Unsubscribe` packet
79///
80/// The `Unsubscribe` packet is sent by clients to request removal of existing
81/// subscriptions. It contains:
82/// - Packet Identifier (for QoS 1 acknowledgment)
83/// - List of topic filters to unsubscribe from
84/// - Optional properties (v5 only)
85///
86/// # Example
87///
88/// ```rust
89/// use mqute_codec::protocol::TopicFilters;
90/// use mqute_codec::protocol::v5::{Unsubscribe, UnsubscribeProperties};
91///
92/// // Simple unsubscribe with no properties
93/// let unsubscribe = Unsubscribe::new(
94/// 1234,
95/// None,
96/// vec!["sensors/temperature", "control/#"]
97/// );
98///
99/// assert_eq!(unsubscribe.packet_id(), 1234u16);
100///
101/// // Unsubscribe with properties
102/// let properties = UnsubscribeProperties {
103/// user_properties: vec![("reason".into(), "client_shutdown".into())],
104/// };
105/// let unsubscribe = Unsubscribe::new(
106/// 5678,
107/// Some(properties),
108/// vec!["debug/logs"]
109/// );
110///
111/// assert_eq!(unsubscribe.filters(), TopicFilters::new(vec!["debug/logs"]));
112/// ```
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct Unsubscribe {
115 header: UnsubscribeHeader,
116 filters: TopicFilters,
117}
118
119impl Unsubscribe {
120 /// Creates a new `Unsubscribe` packet
121 ///
122 /// # Panics
123 ///
124 /// Panics if the topic filters are invalid according to MQTT topic naming rules.
125 pub fn new<T: IntoIterator<Item: Into<String>>>(
126 packet_id: u16,
127 properties: Option<UnsubscribeProperties>,
128 filters: T,
129 ) -> Self {
130 let filters: Vec<String> = filters.into_iter().map(|x| x.into()).collect();
131
132 Unsubscribe {
133 header: UnsubscribeHeader::new(packet_id, properties),
134 filters: filters.into(),
135 }
136 }
137
138 /// Returns the packet identifier
139 pub fn packet_id(&self) -> u16 {
140 self.header.packet_id
141 }
142
143 /// Returns the unsubscribe properties
144 pub fn properties(&self) -> Option<UnsubscribeProperties> {
145 self.header.properties.clone()
146 }
147
148 /// Returns the topic filters to unsubscribe from
149 pub fn filters(&self) -> TopicFilters {
150 self.filters.clone()
151 }
152}
153
154impl Encode for Unsubscribe {
155 /// Encodes the `Unsubscribe` packet into a byte buffer
156 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
157 let header = FixedHeader::with_flags(
158 PacketType::Unsubscribe,
159 Flags::new(QoS::AtLeastOnce),
160 self.payload_len(),
161 );
162 header.encode(buf)?;
163
164 self.header.encode(buf)?;
165 self.filters.encode(buf);
166 Ok(())
167 }
168
169 /// Calculates the total packet length
170 fn payload_len(&self) -> usize {
171 self.header.encoded_len() + self.filters.encoded_len()
172 }
173}
174
175impl Decode for Unsubscribe {
176 /// Decodes an `Unsubscribe` packet from raw bytes
177 fn decode(mut packet: RawPacket) -> Result<Self, Error> {
178 // Validate header flags
179 if packet.header.packet_type() != PacketType::Unsubscribe
180 || packet.header.flags() != Flags::new(QoS::AtLeastOnce)
181 {
182 return Err(Error::MalformedPacket);
183 }
184
185 let header = UnsubscribeHeader::decode(&mut packet.payload)?;
186 let filters = TopicFilters::decode(&mut packet.payload)?;
187
188 Ok(Unsubscribe::new(
189 header.packet_id,
190 header.properties,
191 filters,
192 ))
193 }
194}