1use std::io::{self, Read, Write};
4use std::string::FromUtf8Error;
5
6use byteorder::{ReadBytesExt, WriteBytesExt};
7
8use crate::control::variable_header::PacketIdentifier;
9use crate::control::{ControlType, FixedHeader, PacketType};
10use crate::packet::{DecodablePacket, PacketError};
11use crate::topic_filter::{TopicFilter, TopicFilterDecodeError, TopicFilterError};
12use crate::{Decodable, Encodable, QualityOfService};
13
14#[derive(Debug, Eq, PartialEq, Clone)]
16pub struct SubscribePacket {
17 fixed_header: FixedHeader,
18 packet_identifier: PacketIdentifier,
19 payload: SubscribePacketPayload,
20}
21
22encodable_packet!(SubscribePacket(packet_identifier, payload));
23
24impl SubscribePacket {
25 pub fn new(pkid: u16, subscribes: Vec<(TopicFilter, QualityOfService)>) -> SubscribePacket {
26 let mut pk = SubscribePacket {
27 fixed_header: FixedHeader::new(PacketType::with_default(ControlType::Subscribe), 0),
28 packet_identifier: PacketIdentifier(pkid),
29 payload: SubscribePacketPayload::new(subscribes),
30 };
31 pk.fix_header_remaining_len();
32 pk
33 }
34
35 pub fn packet_identifier(&self) -> u16 {
36 self.packet_identifier.0
37 }
38
39 pub fn set_packet_identifier(&mut self, pkid: u16) {
40 self.packet_identifier.0 = pkid;
41 }
42
43 pub fn subscribes(&self) -> &[(TopicFilter, QualityOfService)] {
44 &self.payload.subscribes[..]
45 }
46}
47
48impl DecodablePacket for SubscribePacket {
49 type DecodePacketError = SubscribePacketError;
50
51 fn decode_packet<R: Read>(reader: &mut R, fixed_header: FixedHeader) -> Result<Self, PacketError<Self>> {
52 let packet_identifier: PacketIdentifier = PacketIdentifier::decode(reader)?;
53 let payload: SubscribePacketPayload = SubscribePacketPayload::decode_with(
54 reader,
55 fixed_header.remaining_length - packet_identifier.encoded_length(),
56 )
57 .map_err(PacketError::PayloadError)?;
58 Ok(SubscribePacket {
59 fixed_header,
60 packet_identifier,
61 payload,
62 })
63 }
64}
65
66#[derive(Debug, Eq, PartialEq, Clone)]
68struct SubscribePacketPayload {
69 subscribes: Vec<(TopicFilter, QualityOfService)>,
70}
71
72impl SubscribePacketPayload {
73 pub fn new(subs: Vec<(TopicFilter, QualityOfService)>) -> SubscribePacketPayload {
74 SubscribePacketPayload { subscribes: subs }
75 }
76}
77
78impl Encodable for SubscribePacketPayload {
79 fn encode<W: Write>(&self, writer: &mut W) -> Result<(), io::Error> {
80 for &(ref filter, ref qos) in self.subscribes.iter() {
81 filter.encode(writer)?;
82 writer.write_u8(*qos as u8)?;
83 }
84
85 Ok(())
86 }
87
88 fn encoded_length(&self) -> u32 {
89 self.subscribes.iter().fold(0, |b, a| b + a.0.encoded_length() + 1)
90 }
91}
92
93impl Decodable for SubscribePacketPayload {
94 type Error = SubscribePacketError;
95 type Cond = u32;
96
97 fn decode_with<R: Read>(
98 reader: &mut R,
99 mut payload_len: u32,
100 ) -> Result<SubscribePacketPayload, SubscribePacketError> {
101 let mut subs = Vec::new();
102
103 while payload_len > 0 {
104 let filter = TopicFilter::decode(reader)?;
105 let qos = match reader.read_u8()? {
106 0 => QualityOfService::Level0,
107 1 => QualityOfService::Level1,
108 2 => QualityOfService::Level2,
109 _ => return Err(SubscribePacketError::InvalidQualityOfService),
110 };
111
112 payload_len -= filter.encoded_length() + 1;
113 subs.push((filter, qos));
114 }
115
116 Ok(SubscribePacketPayload::new(subs))
117 }
118}
119
120#[derive(Debug, thiserror::Error)]
121pub enum SubscribePacketError {
122 #[error(transparent)]
123 IoError(#[from] io::Error),
124 #[error(transparent)]
125 FromUtf8Error(#[from] FromUtf8Error),
126 #[error("invalid quality of service")]
127 InvalidQualityOfService,
128 #[error(transparent)]
129 TopicFilterError(#[from] TopicFilterError),
130}
131
132impl From<TopicFilterDecodeError> for SubscribePacketError {
133 fn from(e: TopicFilterDecodeError) -> Self {
134 match e {
135 TopicFilterDecodeError::IoError(e) => e.into(),
136 TopicFilterDecodeError::InvalidTopicFilter(e) => e.into(),
137 }
138 }
139}