mqtt/packet/
unsubscribe.rs

1//! UNSUBSCRIBE
2
3use std::io::{self, Read, Write};
4use std::string::FromUtf8Error;
5
6use crate::control::variable_header::PacketIdentifier;
7use crate::control::{ControlType, FixedHeader, PacketType};
8use crate::packet::{DecodablePacket, PacketError};
9use crate::topic_filter::{TopicFilter, TopicFilterDecodeError, TopicFilterError};
10use crate::{Decodable, Encodable};
11
12/// `UNSUBSCRIBE` packet
13#[derive(Debug, Eq, PartialEq, Clone)]
14pub struct UnsubscribePacket {
15    fixed_header: FixedHeader,
16    packet_identifier: PacketIdentifier,
17    payload: UnsubscribePacketPayload,
18}
19
20encodable_packet!(UnsubscribePacket(packet_identifier, payload));
21
22impl UnsubscribePacket {
23    pub fn new(pkid: u16, subscribes: Vec<TopicFilter>) -> UnsubscribePacket {
24        let mut pk = UnsubscribePacket {
25            fixed_header: FixedHeader::new(PacketType::with_default(ControlType::Unsubscribe), 0),
26            packet_identifier: PacketIdentifier(pkid),
27            payload: UnsubscribePacketPayload::new(subscribes),
28        };
29        pk.fix_header_remaining_len();
30        pk
31    }
32
33    pub fn packet_identifier(&self) -> u16 {
34        self.packet_identifier.0
35    }
36
37    pub fn set_packet_identifier(&mut self, pkid: u16) {
38        self.packet_identifier.0 = pkid;
39    }
40
41    pub fn subscribes(&self) -> &[TopicFilter] {
42        &self.payload.subscribes[..]
43    }
44}
45
46impl DecodablePacket for UnsubscribePacket {
47    type DecodePacketError = UnsubscribePacketError;
48
49    fn decode_packet<R: Read>(reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>> {
50        let packet_identifier: PacketIdentifier = PacketIdentifier::decode(reader)?;
51        let payload: UnsubscribePacketPayload = UnsubscribePacketPayload::decode_with(
52            reader,
53            fixed_header.remaining_length - packet_identifier.encoded_length(),
54        )
55        .map_err(PacketError::PayloadError)?;
56        Ok(UnsubscribePacket {
57            fixed_header,
58            packet_identifier,
59            payload,
60        })
61    }
62}
63
64#[derive(Debug, Eq, PartialEq, Clone)]
65struct UnsubscribePacketPayload {
66    subscribes: Vec<TopicFilter>,
67}
68
69impl UnsubscribePacketPayload {
70    pub fn new(subs: Vec<TopicFilter>) -> UnsubscribePacketPayload {
71        UnsubscribePacketPayload { subscribes: subs }
72    }
73}
74
75impl Encodable for UnsubscribePacketPayload {
76    fn encode<W: Write>(&self, writer: &mut W) -> Result<(), io::Error> {
77        for filter in self.subscribes.iter() {
78            filter.encode(writer)?;
79        }
80
81        Ok(())
82    }
83
84    fn encoded_length(&self) -> u32 {
85        self.subscribes.iter().fold(0, |b, a| b + a.encoded_length())
86    }
87}
88
89impl Decodable for UnsubscribePacketPayload {
90    type Error = UnsubscribePacketError;
91    type Cond = u32;
92
93    fn decode_with<R: Read>(
94        reader: &mut R,
95        mut payload_len: u32,
96    ) -> Result<UnsubscribePacketPayload, UnsubscribePacketError> {
97        let mut subs = Vec::new();
98
99        while payload_len > 0 {
100            let filter = TopicFilter::decode(reader)?;
101            payload_len -= filter.encoded_length();
102            subs.push(filter);
103        }
104
105        Ok(UnsubscribePacketPayload::new(subs))
106    }
107}
108
109#[derive(Debug, thiserror::Error)]
110#[error(transparent)]
111pub enum UnsubscribePacketError {
112    IoError(#[from] io::Error),
113    FromUtf8Error(#[from] FromUtf8Error),
114    TopicFilterError(#[from] TopicFilterError),
115}
116
117impl From<TopicFilterDecodeError> for UnsubscribePacketError {
118    fn from(e: TopicFilterDecodeError) -> Self {
119        match e {
120            TopicFilterDecodeError::IoError(e) => e.into(),
121            TopicFilterDecodeError::InvalidTopicFilter(e) => e.into(),
122        }
123    }
124}