mqtt/packet/
subscribe.rs

1//! SUBSCRIBE
2
3use std::io::{self, Read, Write};
4use std::string::FromUtf8Error;
5
6use byteorder::{ReadBytesExt, WriteBytesExt};
7
8use crate::control::variable_header::PacketIdentifier;
9use crate::control::{ControlType, FixedHeader, PacketType};
10use crate::packet::{DecodablePacket, PacketError};
11use crate::topic_filter::{TopicFilter, TopicFilterDecodeError, TopicFilterError};
12use crate::{Decodable, Encodable, QualityOfService};
13
14/// `SUBSCRIBE` packet
15#[derive(Debug, Eq, PartialEq, Clone)]
16pub struct SubscribePacket {
17    fixed_header: FixedHeader,
18    packet_identifier: PacketIdentifier,
19    payload: SubscribePacketPayload,
20}
21
22encodable_packet!(SubscribePacket(packet_identifier, payload));
23
24impl SubscribePacket {
25    pub fn new(pkid: u16, subscribes: Vec<(TopicFilter, QualityOfService)>) -> SubscribePacket {
26        let mut pk = SubscribePacket {
27            fixed_header: FixedHeader::new(PacketType::with_default(ControlType::Subscribe), 0),
28            packet_identifier: PacketIdentifier(pkid),
29            payload: SubscribePacketPayload::new(subscribes),
30        };
31        pk.fix_header_remaining_len();
32        pk
33    }
34
35    pub fn packet_identifier(&self) -> u16 {
36        self.packet_identifier.0
37    }
38
39    pub fn set_packet_identifier(&mut self, pkid: u16) {
40        self.packet_identifier.0 = pkid;
41    }
42
43    pub fn subscribes(&self) -> &[(TopicFilter, QualityOfService)] {
44        &self.payload.subscribes[..]
45    }
46}
47
48impl DecodablePacket for SubscribePacket {
49    type DecodePacketError = SubscribePacketError;
50
51    fn decode_packet<R: Read>(reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>> {
52        let packet_identifier: PacketIdentifier = PacketIdentifier::decode(reader)?;
53        let payload: SubscribePacketPayload = SubscribePacketPayload::decode_with(
54            reader,
55            fixed_header.remaining_length - packet_identifier.encoded_length(),
56        )
57        .map_err(PacketError::PayloadError)?;
58        Ok(SubscribePacket {
59            fixed_header,
60            packet_identifier,
61            payload,
62        })
63    }
64}
65
66/// Payload of subscribe packet
67#[derive(Debug, Eq, PartialEq, Clone)]
68struct SubscribePacketPayload {
69    subscribes: Vec<(TopicFilter, QualityOfService)>,
70}
71
72impl SubscribePacketPayload {
73    pub fn new(subs: Vec<(TopicFilter, QualityOfService)>) -> SubscribePacketPayload {
74        SubscribePacketPayload { subscribes: subs }
75    }
76}
77
78impl Encodable for SubscribePacketPayload {
79    fn encode<W: Write>(&self, writer: &mut W) -> Result<(), io::Error> {
80        for &(ref filter, ref qos) in self.subscribes.iter() {
81            filter.encode(writer)?;
82            writer.write_u8(*qos as u8)?;
83        }
84
85        Ok(())
86    }
87
88    fn encoded_length(&self) -> u32 {
89        self.subscribes.iter().fold(0, |b, a| b + a.0.encoded_length() + 1)
90    }
91}
92
93impl Decodable for SubscribePacketPayload {
94    type Error = SubscribePacketError;
95    type Cond = u32;
96
97    fn decode_with<R: Read>(
98        reader: &mut R,
99        mut payload_len: u32,
100    ) -> Result<SubscribePacketPayload, SubscribePacketError> {
101        let mut subs = Vec::new();
102
103        while payload_len > 0 {
104            let filter = TopicFilter::decode(reader)?;
105            let qos = match reader.read_u8()? {
106                0 => QualityOfService::Level0,
107                1 => QualityOfService::Level1,
108                2 => QualityOfService::Level2,
109                _ => return Err(SubscribePacketError::InvalidQualityOfService),
110            };
111
112            payload_len -= filter.encoded_length() + 1;
113            subs.push((filter, qos));
114        }
115
116        Ok(SubscribePacketPayload::new(subs))
117    }
118}
119
120#[derive(Debug, thiserror::Error)]
121pub enum SubscribePacketError {
122    #[error(transparent)]
123    IoError(#[from] io::Error),
124    #[error(transparent)]
125    FromUtf8Error(#[from] FromUtf8Error),
126    #[error("invalid quality of service")]
127    InvalidQualityOfService,
128    #[error(transparent)]
129    TopicFilterError(#[from] TopicFilterError),
130}
131
132impl From<TopicFilterDecodeError> for SubscribePacketError {
133    fn from(e: TopicFilterDecodeError) -> Self {
134        match e {
135            TopicFilterDecodeError::IoError(e) => e.into(),
136            TopicFilterDecodeError::InvalidTopicFilter(e) => e.into(),
137        }
138    }
139}