1use crate::{
2 codec, Auth, ConnAck, Connect, Disconnect, PacketType, PingReq, PingResp, PubAck, PubComp,
3 PubRec, PubRel, Publish, ReasonCode::ProtocolError, Result as SageResult, SubAck, Subscribe,
4 UnSubAck, UnSubscribe,
5};
6use std::{fmt, marker::Unpin};
7use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
8
9#[derive(Debug)]
10struct FixedHeader {
11 pub packet_type: PacketType,
12 pub remaining_size: usize,
13}
14
15impl FixedHeader {
16 async fn encode<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
17 let mut n = codec::write_control_packet_type(self.packet_type, writer).await?;
18 n += codec::write_variable_byte_integer(self.remaining_size as u32, writer).await?;
19 Ok(n)
20 }
21
22 async fn decode<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
23 let packet_type = codec::read_control_packet_type(reader).await?;
24 let remaining_size = codec::read_variable_byte_integer(reader).await? as usize;
25 Ok(FixedHeader {
26 packet_type,
27 remaining_size,
28 })
29 }
30}
31
32#[derive(Debug, Clone)]
35pub enum Packet {
36 Connect(Connect),
38
39 ConnAck(ConnAck),
41
42 Publish(Publish),
44
45 PubAck(PubAck),
47
48 PubRec(PubRec),
50
51 PubRel(PubRel),
53
54 PubComp(PubComp),
56
57 Subscribe(Subscribe),
59
60 SubAck(SubAck),
62
63 UnSubscribe(UnSubscribe),
65
66 UnSubAck(UnSubAck),
68
69 PingReq,
71
72 PingResp,
74
75 Disconnect(Disconnect),
77
78 Auth(Auth),
80}
81
82impl fmt::Display for Packet {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 Packet::Connect(_) => write!(f, "Connect"),
86 Packet::ConnAck(connack) => write!(f, "ConnAck [{:?}]", connack.reason_code),
87 Packet::Publish(_) => write!(f, "Publish"),
88 Packet::PubAck(_) => write!(f, "PubAck"),
89 Packet::PubRec(_) => write!(f, "PubRec"),
90 Packet::PubRel(_) => write!(f, "PubRel"),
91 Packet::PubComp(_) => write!(f, "PubComp"),
92 Packet::Subscribe(_) => write!(f, "Subscribe"),
93 Packet::SubAck(_) => write!(f, "SubAck"),
94 Packet::UnSubscribe(_) => write!(f, "UnSubscribe"),
95 Packet::UnSubAck(_) => write!(f, "UnSubAck"),
96 Packet::PingReq => write!(f, "PingReq"),
97 Packet::PingResp => write!(f, "PingResp"),
98 Packet::Disconnect(disconnect) => {
99 write!(f, "Disconnect [{:?}]", disconnect.reason_code)
100 }
101 Packet::Auth(_) => write!(f, "Auth"),
102 }
103 }
104}
105
106impl From<Connect> for Packet {
107 fn from(control: Connect) -> Self {
108 Packet::Connect(control)
109 }
110}
111impl From<ConnAck> for Packet {
112 fn from(control: ConnAck) -> Self {
113 Packet::ConnAck(control)
114 }
115}
116impl From<Publish> for Packet {
117 fn from(control: Publish) -> Self {
118 Packet::Publish(control)
119 }
120}
121impl From<PubAck> for Packet {
122 fn from(control: PubAck) -> Self {
123 Packet::PubAck(control)
124 }
125}
126impl From<PubRec> for Packet {
127 fn from(control: PubRec) -> Self {
128 Packet::PubRec(control)
129 }
130}
131impl From<PubRel> for Packet {
132 fn from(control: PubRel) -> Self {
133 Packet::PubRel(control)
134 }
135}
136impl From<PubComp> for Packet {
137 fn from(control: PubComp) -> Self {
138 Packet::PubComp(control)
139 }
140}
141impl From<Subscribe> for Packet {
142 fn from(control: Subscribe) -> Self {
143 Packet::Subscribe(control)
144 }
145}
146impl From<SubAck> for Packet {
147 fn from(control: SubAck) -> Self {
148 Packet::SubAck(control)
149 }
150}
151impl From<UnSubscribe> for Packet {
152 fn from(control: UnSubscribe) -> Self {
153 Packet::UnSubscribe(control)
154 }
155}
156impl From<UnSubAck> for Packet {
157 fn from(control: UnSubAck) -> Self {
158 Packet::UnSubAck(control)
159 }
160}
161impl From<PingReq> for Packet {
162 fn from(_: PingReq) -> Self {
163 Packet::PingReq
164 }
165}
166impl From<PingResp> for Packet {
167 fn from(_: PingResp) -> Self {
168 Packet::PingResp
169 }
170}
171impl From<Disconnect> for Packet {
172 fn from(control: Disconnect) -> Self {
173 Packet::Disconnect(control)
174 }
175}
176impl From<Auth> for Packet {
177 fn from(control: Auth) -> Self {
178 Packet::Auth(control)
179 }
180}
181
182impl Packet {
183 pub async fn encode<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
188 let mut variable_and_payload = Vec::new();
189 let (packet_type, remaining_size) = match self {
190 Packet::Connect(packet) => (
191 PacketType::Connect,
192 packet.write(&mut variable_and_payload).await?,
193 ),
194 Packet::ConnAck(packet) => (
195 PacketType::ConnAck,
196 packet.write(&mut variable_and_payload).await?,
197 ),
198 Packet::PingReq => (PacketType::PingReq, 0),
199 Packet::PingResp => (PacketType::PingResp, 0),
200 Packet::UnSubAck(packet) => (
201 PacketType::UnSubAck,
202 packet.write(&mut variable_and_payload).await?,
203 ),
204 Packet::Auth(packet) => (
205 PacketType::Auth,
206 packet.write(&mut variable_and_payload).await?,
207 ),
208 Packet::PubAck(packet) => (
209 PacketType::PubAck,
210 packet.write(&mut variable_and_payload).await?,
211 ),
212 Packet::UnSubscribe(packet) => (
213 PacketType::UnSubscribe,
214 packet.write(&mut variable_and_payload).await?,
215 ),
216 Packet::PubRec(packet) => (
217 PacketType::PubRec,
218 packet.write(&mut variable_and_payload).await?,
219 ),
220 Packet::Disconnect(packet) => (
221 PacketType::Disconnect,
222 packet.write(&mut variable_and_payload).await?,
223 ),
224 Packet::PubRel(packet) => (
225 PacketType::PubRel,
226 packet.write(&mut variable_and_payload).await?,
227 ),
228 Packet::SubAck(packet) => (
229 PacketType::SubAck,
230 packet.write(&mut variable_and_payload).await?,
231 ),
232 Packet::PubComp(packet) => (
233 PacketType::PubComp,
234 packet.write(&mut variable_and_payload).await?,
235 ),
236 Packet::Subscribe(packet) => (
237 PacketType::Subscribe,
238 packet.write(&mut variable_and_payload).await?,
239 ),
240 Packet::Publish(packet) => (
241 PacketType::Publish {
242 duplicate: packet.duplicate,
243 qos: packet.qos,
244 retain: packet.retain,
245 },
246 packet.write(&mut variable_and_payload).await?,
247 ),
248 };
249
250 let mut fixed_header_buffer = Vec::new();
251
252 let fixed_size = FixedHeader {
253 packet_type,
254 remaining_size,
255 }
256 .encode(&mut fixed_header_buffer)
257 .await?;
258
259 writer.write_all(&fixed_header_buffer).await?;
260 writer.write_all(&variable_and_payload).await?;
261 Ok(fixed_size + remaining_size)
262 }
263
264 pub async fn decode<R: AsyncRead + Unpin>(reader: &mut R) -> SageResult<Self> {
268 let fixed_header = FixedHeader::decode(reader).await?;
269
270 let packet = match fixed_header.packet_type {
271 PacketType::Connect => Packet::Connect(Connect::read(reader).await?),
272 PacketType::ConnAck => Packet::ConnAck(ConnAck::read(reader).await?),
273 PacketType::PubAck => {
274 Packet::PubAck(PubAck::read(reader, fixed_header.remaining_size == 2).await?)
275 }
276 PacketType::PubRec => {
277 Packet::PubRec(PubRec::read(reader, fixed_header.remaining_size == 2).await?)
278 }
279 PacketType::PingReq => Packet::PingReq,
280 PacketType::PingResp => Packet::PingResp,
281 PacketType::SubAck => {
282 Packet::SubAck(SubAck::read(reader, fixed_header.remaining_size).await?)
283 }
284 PacketType::UnSubscribe => {
285 Packet::UnSubscribe(UnSubscribe::read(reader, fixed_header.remaining_size).await?)
286 }
287 PacketType::Auth => Packet::Auth(Auth::read(reader).await?),
288 PacketType::PubRel => {
289 Packet::PubRel(PubRel::read(reader, fixed_header.remaining_size == 2).await?)
290 }
291 PacketType::Disconnect => Packet::Disconnect(Disconnect::read(reader).await?),
292 PacketType::PubComp => {
293 Packet::PubComp(PubComp::read(reader, fixed_header.remaining_size == 2).await?)
294 }
295
296 PacketType::Subscribe => {
297 Packet::Subscribe(Subscribe::read(reader, fixed_header.remaining_size).await?)
298 }
299
300 PacketType::UnSubAck => {
301 Packet::UnSubAck(UnSubAck::read(reader, fixed_header.remaining_size).await?)
302 }
303
304 PacketType::Publish {
305 duplicate,
306 qos,
307 retain,
308 } => Packet::Publish(
309 Publish::read(
310 reader,
311 duplicate,
312 qos,
313 retain,
314 fixed_header.remaining_size as u64,
315 )
316 .await?,
317 ),
318 _ => return Err(ProtocolError.into()),
319 };
320
321 Ok(packet)
322 }
323}