mqute_codec/codec/packet.rs
1//! # RawPacket and PacketCodec
2//!
3//! This module provides the `RawPacket` struct, which represents a raw MQTT packet,
4//! and the `PacketCodec` struct, which implements encoding and decoding of MQTT packets
5//! using the `tokio_util::codec` traits.
6//!
7//! - `RawPacket`: Represents a raw MQTT packet, consisting of a fixed header and a payload.
8//! - `PacketCodec`: A codec for encoding and decoding MQTT packets, with support for
9//! size limits on inbound and outbound packets.
10
11use super::{Encode, Encoded};
12use crate::protocol::FixedHeader;
13use crate::Error;
14use bytes::{Buf, Bytes, BytesMut};
15use tokio_util::codec::{Decoder, Encoder};
16
17/// Represents a raw MQTT packet.
18///
19/// A raw packet consists of:
20/// - A fixed header (`FixedHeader`), which contains metadata about the packet.
21/// - A payload (`Bytes`), which contains the variable header and payload data.
22///
23/// # Examples
24///
25/// ```rust
26/// use mqute_codec::codec::{RawPacket};
27/// use mqute_codec::protocol::{FixedHeader, PacketType};
28/// use bytes::Bytes;
29///
30/// let header = FixedHeader::new(PacketType::Publish, 4);
31/// let payload = Bytes::from_static(&[0x00, 0x01, 0x02, 0x03]);
32/// let packet = RawPacket::new(header, payload);
33/// ```
34#[derive(Debug, Clone)]
35pub struct RawPacket {
36 /// The fixed header of the packet.
37 pub header: FixedHeader,
38
39 /// The variable header and payload of the packet.
40 pub payload: Bytes,
41}
42
43impl RawPacket {
44 /// Creates a new `RawPacket` with the specified fixed header and payload.
45 ///
46 /// # Panics
47 /// Panics if the length of the payload does not match the remaining length specified
48 /// in the fixed header.
49 pub fn new(header: FixedHeader, payload: Bytes) -> Self {
50 if header.remaining_len() != payload.len() {
51 panic!("Header and payload mismatch");
52 }
53 RawPacket { header, payload }
54 }
55}
56
57/// A codec for encoding and decoding MQTT packets.
58///
59/// The `PacketCodec` struct implements the `Encoder` and `Decoder` traits from the
60/// `tokio_util::codec` module, allowing it to be used with asynchronous I/O frameworks.
61///
62/// # Examples
63///
64/// ```rust
65/// use mqute_codec::codec::PacketCodec;
66/// use mqute_codec::protocol::{FixedHeader, PacketType};
67/// use bytes::BytesMut;
68///
69/// let mut codec = PacketCodec::new(Some(1024), Some(1024));
70/// let mut buffer = BytesMut::from(&[0x30, 0x02, 0x00, 0x01][..]); // Example raw packet
71/// let packet = codec.try_decode(&mut buffer).unwrap();
72/// ```
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct PacketCodec {
75 /// The maximum allowed size for inbound packets.
76 inbound_max_size: Option<usize>,
77
78 /// The maximum allowed size for outbound packets.
79 outbound_max_size: Option<usize>,
80}
81
82impl PacketCodec {
83 /// Creates a new `PacketCodec` with the specified size limits.
84 pub fn new(inbound_max_size: Option<usize>, outbound_max_size: Option<usize>) -> Self {
85 PacketCodec {
86 inbound_max_size,
87 outbound_max_size,
88 }
89 }
90
91 /// Attempts to decode a raw packet from the provided buffer.
92 pub fn try_decode(&self, dst: &mut BytesMut) -> Result<RawPacket, Error> {
93 // Decode the header and check the allowable size
94 let header = FixedHeader::decode(dst, self.inbound_max_size)?;
95
96 let mut payload = dst.split_to(header.packet_len()).freeze();
97
98 // Skip the header data
99 payload.advance(header.fixed_len());
100
101 Ok(RawPacket::new(header, payload))
102 }
103}
104
105impl<T> Encoder<T> for PacketCodec
106where
107 T: Encode,
108{
109 type Error = Error;
110
111 /// Encodes an item into the provided buffer.
112 ///
113 /// # Examples
114 ///
115 /// ```rust
116 /// use mqute_codec::codec::{PacketCodec, Encode};
117 /// use tokio_util::codec::Encoder;
118 /// use bytes::BytesMut;
119 /// use mqute_codec::Error;
120 ///
121 /// struct MyPacket;
122 ///
123 /// impl Encode for MyPacket {
124 /// fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> {
125 /// dst.extend_from_slice(&[0x30, 0x02, 0x00, 0x01]);
126 /// Ok(())
127 /// }
128 ///
129 /// fn payload_len(&self) -> usize {
130 /// 4
131 /// }
132 /// }
133 ///
134 /// let mut codec = PacketCodec::new(Some(1024), Some(1024));
135 /// let mut buffer = BytesMut::new();
136 /// let packet = MyPacket {};
137 /// codec.encode(packet, &mut buffer).unwrap();
138 ///
139 /// assert_eq!(buffer.as_ref(), &[0x30, 0x02, 0x00, 0x01]);
140 /// ```
141 fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
142 if let Some(max_size) = self.outbound_max_size {
143 if item.encoded_len() > max_size {
144 return Err(Error::OutgoingPayloadSizeLimitExceeded(item.encoded_len()));
145 }
146 }
147
148 item.encode(dst)
149 }
150}
151
152impl Decoder for PacketCodec {
153 type Item = RawPacket;
154 type Error = Error;
155
156 /// Decodes a raw packet from the provided buffer.
157 ///
158 /// # Examples
159 ///
160 /// ```rust
161 /// use mqute_codec::codec::PacketCodec;
162 /// use mqute_codec::protocol::PacketType;
163 /// use tokio_util::codec::Decoder;
164 /// use bytes::BytesMut;
165 ///
166 /// let mut codec = PacketCodec::new(Some(1024), Some(1024));
167 /// let mut buffer = BytesMut::from(&[0x30, 0x02, 0x00, 0x01][..]); // Example raw packet
168 /// let packet = codec.decode(&mut buffer).unwrap().unwrap();
169 ///
170 /// assert_eq!(packet.header.packet_type(), PacketType::Publish);
171 /// assert_eq!(packet.payload.len(), 2);
172 /// ```
173 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
174 match self.try_decode(src) {
175 Ok(packet) => Ok(Some(packet)),
176 Err(Error::NotEnoughBytes(len)) => {
177 // Get more packets to construct the incomplete packet
178 src.reserve(len);
179 Ok(None)
180 }
181 Err(e) => Err(e),
182 }
183 }
184}