1use core::{
2 default::Default,
3 convert::TryFrom,
4 cmp::min,
5 result::Result,
6};
7
8use crate::{
9 fixed_header::{self, FixedHeader},
10 variable_header::{self, VariableHeader},
11 payload::{self, Payload},
12 status::Status,
13 error::{DecodeError, EncodeError},
14 codec::{Decodable, Encodable},
15 qos,
16};
17
18#[derive(Debug)]
19#[allow(dead_code)]
20pub struct Packet<'a> {
21 fixed_header: FixedHeader,
22 variable_header: Option<VariableHeader<'a>>,
23 payload: Payload<'a>,
24}
25
26impl<'a> Packet<'a> {
30 pub fn connect(variable_header: variable_header::connect::Connect<'a>, payload: payload::connect::Connect<'a>) -> Result<Self, EncodeError> {
32 Self::packet(
33 fixed_header::PacketType::Connect,
34 fixed_header::PacketFlags::CONNECT,
35 Some(variable_header::VariableHeader::Connect(variable_header)),
36 payload::Payload::Connect(payload)
37 )
38 }
39
40 pub fn subscribe(variable_header: variable_header::packet_identifier::PacketIdentifier, payload: payload::subscribe::Subscribe<'a>) -> Result<Self, EncodeError> {
42 Self::packet(
43 fixed_header::PacketType::Subscribe,
44 fixed_header::PacketFlags::SUBSCRIBE,
45 Some(variable_header::VariableHeader::Subscribe(variable_header)),
46 payload::Payload::Subscribe(payload)
47 )
48 }
49
50 pub fn publish(flags: fixed_header::PublishFlags, variable_header: variable_header::publish::Publish<'a>, payload: &'a [u8]) -> Result<Self, EncodeError> {
52 assert!(flags.qos().expect("valid qos") == qos::QoS::AtMostOnce || variable_header.packet_identifier().is_some());
54
55 Self::packet(
56 fixed_header::PacketType::Publish,
57 flags.into(),
58 Some(variable_header::VariableHeader::Publish(variable_header)),
59 payload::Payload::Bytes(payload)
60 )
61 }
62
63 pub fn puback(variable_header: variable_header::packet_identifier::PacketIdentifier) -> Result<Self, EncodeError> {
64 Self::packet(
65 fixed_header::PacketType::Puback,
66 fixed_header::PacketFlags::PUBACK,
67 Some(variable_header::VariableHeader::Puback(variable_header)),
68 Default::default(),
69 )
70 }
71
72 pub fn pingreq() -> Self {
74 Self {
75 fixed_header: FixedHeader::new(
76 fixed_header::PacketType::Pingreq,
77 fixed_header::PacketFlags::PINGREQ,
78 0,
79 ),
80 variable_header: None,
81 payload: Default::default(),
82 }
83 }
84
85 pub fn pingresp() -> Self {
87 Self {
88 fixed_header: FixedHeader::new(
89 fixed_header::PacketType::Pingresp,
90 fixed_header::PacketFlags::PINGRESP,
91 0,
92 ),
93 variable_header: None,
94 payload: Default::default(),
95 }
96 }
97
98 fn packet(r#type: fixed_header::PacketType, flags: fixed_header::PacketFlags, variable_header: Option<VariableHeader<'a>>, payload: Payload<'a>) -> Result<Self, EncodeError> {
103 let len = u32::try_from(
104 variable_header.as_ref().map(VariableHeader::encoded_len).unwrap_or(0) +
105 payload.encoded_len()
106 )?;
107
108 Ok(Self {
109 fixed_header: FixedHeader::new(
110 r#type,
111 flags,
112 len,
113 ),
114 variable_header: variable_header,
115 payload: payload,
116 })
117 }
118
119 pub fn fixed_header(&self) -> &FixedHeader {
123 &self.fixed_header
124 }
125
126 pub fn variable_header(&self) -> &Option<VariableHeader> {
128 &self.variable_header
129 }
130
131 pub fn payload(&self) -> &Payload {
133 &self.payload
134 }
135}
136
137impl<'a> Decodable<'a> for Packet<'a> {
138 fn decode(bytes: &'a [u8]) -> Result<Status<(usize, Self)>, DecodeError> {
151 let (fixed_header_offset, fixed_header) = read!(FixedHeader::decode, bytes, 0);
152
153 let (variable_header_consumed, variable_header) = if let Some(result) = VariableHeader::decode(fixed_header.r#type(), fixed_header.flags(), &bytes[fixed_header_offset..]) {
154 let (variable_header_offset, variable_header) = complete!(result);
155 (variable_header_offset, Some(variable_header))
156 } else {
157 (0, None)
158 };
159
160 let payload_len = fixed_header.len() as usize - variable_header_consumed;
161
162 let available = bytes.len() - (fixed_header_offset + variable_header_consumed);
163 let needed = payload_len - min(available, payload_len);
164 if needed > 0 {
165 return Ok(Status::Partial(needed));
166 }
167
168 let payload_bytes = &bytes[fixed_header_offset+variable_header_consumed..fixed_header_offset+variable_header_consumed+payload_len];
169
170 let payload = if let Some(result) = Payload::decode(fixed_header.r#type(), payload_bytes) {
171 match result {
172 Err(e) => return Err(e),
173 Ok(Status::Partial(n)) => return Ok(Status::Partial(n)),
174 Ok(Status::Complete((_, payload))) => payload,
175 }
176 } else {
177 payload::Payload::Bytes(payload_bytes)
178 };
179
180 Ok(Status::Complete((fixed_header_offset + fixed_header.len() as usize, Self {
181 fixed_header,
182 variable_header,
183 payload,
184 })))
185 }
186}
187
188impl<'a> Encodable for Packet<'a> {
189 fn encoded_len(&self) -> usize {
193 self.fixed_header.encoded_len() + self.fixed_header.len() as usize
194 }
195
196 fn encode(&self, bytes: &mut [u8]) -> Result<usize, EncodeError> {
203 let mut offset = 0;
204
205 offset = {
206 let o = self.fixed_header.encode(&mut bytes[offset..])?;
207 offset + o
208 };
209
210 if let Some(ref variable_header) = self.variable_header {
211 offset = {
212 let o = variable_header.encode(&mut bytes[offset..])?;
213 offset + o
214 };
215 }
216
217 let offset = {
218 let o = self.payload.encode(&mut bytes[offset..])?;
219 offset + o
220 };
221
222 Ok(offset)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn encode_publish() {
232 let payload = b"{}";
233 assert_eq!(2, payload.len());
234
235 let mut publish_flags = fixed_header::PublishFlags::default();
236 publish_flags.set_qos(qos::QoS::AtLeastOnce);
237 let publish_id = 2;
238 let publish = Packet::publish(
239 publish_flags,
240 variable_header::publish::Publish::new(
241 "a/b",
242 Some(publish_id),
243 ),
244 payload
245 ).expect("valid packet");
246
247 assert_eq!(11, publish.encoded_len());
248 assert_eq!(2, publish.fixed_header().encoded_len());
249 assert_eq!(9, publish.fixed_header().len());
250 assert_eq!(7, publish.variable_header().as_ref().expect("variable header").encoded_len());
251 assert_eq!(2, publish.payload().encoded_len());
252 }
253
254 #[test]
255 fn encode_subscribe() {
256 let subscribe_id = 1;
257 let sub = Packet::subscribe(
258 variable_header::packet_identifier::PacketIdentifier::new(subscribe_id),
259 payload::subscribe::Subscribe::new(&[
260 ("c/a", qos::QoS::AtMostOnce),
261 ("c/b", qos::QoS::AtLeastOnce),
262 ("c/c", qos::QoS::ExactlyOnce),
263 ]),
264 ).expect("valid packet");
265
266 assert_eq!(22, sub.encoded_len());
267 assert_eq!(2, sub.fixed_header().encoded_len());
268 assert_eq!(20, sub.fixed_header().len());
269 assert_eq!(2, sub.variable_header().as_ref().expect("variable header").encoded_len());
270 assert_eq!(18, sub.payload().encoded_len());
271 }
272}