mqute_codec/codec/
packet.rs

1//! # RawPacket and PacketCodec
2//!
3//! This module provides the `RawPacket` struct, which represents a raw MQTT packet,
4//! and the `PacketCodec` struct, which implements encoding and decoding of MQTT packets
5//! using the `tokio_util::codec` traits.
6//!
7//! - `RawPacket`: Represents a raw MQTT packet, consisting of a fixed header and a payload.
8//! - `PacketCodec`: A codec for encoding and decoding MQTT packets, with support for
9//!   size limits on inbound and outbound packets.
10
11use super::{Encode, Encoded};
12use crate::protocol::FixedHeader;
13use crate::Error;
14use bytes::{Buf, Bytes, BytesMut};
15use tokio_util::codec::{Decoder, Encoder};
16
17/// Represents a raw MQTT packet.
18///
19/// A raw packet consists of:
20/// - A fixed header (`FixedHeader`), which contains metadata about the packet.
21/// - A payload (`Bytes`), which contains the variable header and payload data.
22///
23/// # Examples
24///
25/// ```rust
26/// use mqute_codec::codec::{RawPacket};
27/// use mqute_codec::protocol::{FixedHeader, PacketType};
28/// use bytes::Bytes;
29///
30/// let header = FixedHeader::new(PacketType::Publish, 4);
31/// let payload = Bytes::from_static(&[0x00, 0x01, 0x02, 0x03]);
32/// let packet = RawPacket::new(header, payload);
33/// ```
34#[derive(Debug, Clone)]
35pub struct RawPacket {
36    /// The fixed header of the packet.
37    pub header: FixedHeader,
38
39    /// The variable header and payload of the packet.
40    pub payload: Bytes,
41}
42
43impl RawPacket {
44    /// Creates a new `RawPacket` with the specified fixed header and payload.
45    ///
46    /// # Panics
47    /// Panics if the length of the payload does not match the remaining length specified
48    /// in the fixed header.
49    pub fn new(header: FixedHeader, payload: Bytes) -> Self {
50        if header.remaining_len() != payload.len() {
51            panic!("Header and payload mismatch");
52        }
53        RawPacket { header, payload }
54    }
55}
56
57/// A codec for encoding and decoding MQTT packets.
58///
59/// The `PacketCodec` struct implements the `Encoder` and `Decoder` traits from the
60/// `tokio_util::codec` module, allowing it to be used with asynchronous I/O frameworks.
61///
62/// # Examples
63///
64/// ```rust
65/// use mqute_codec::codec::PacketCodec;
66/// use mqute_codec::protocol::{FixedHeader, PacketType};
67/// use bytes::BytesMut;
68///
69/// let mut codec = PacketCodec::new(Some(1024), Some(1024));
70/// let mut buffer = BytesMut::from(&[0x30, 0x02, 0x00, 0x01][..]); // Example raw packet
71/// let packet = codec.try_decode(&mut buffer).unwrap();
72/// ```
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct PacketCodec {
75    /// The maximum allowed size for inbound packets.
76    inbound_max_size: Option<usize>,
77
78    /// The maximum allowed size for outbound packets.
79    outbound_max_size: Option<usize>,
80}
81
82impl PacketCodec {
83    /// Creates a new `PacketCodec` with the specified size limits.
84    pub fn new(inbound_max_size: Option<usize>, outbound_max_size: Option<usize>) -> Self {
85        PacketCodec {
86            inbound_max_size,
87            outbound_max_size,
88        }
89    }
90
91    /// Attempts to decode a raw packet from the provided buffer.
92    pub fn try_decode(&self, dst: &mut BytesMut) -> Result<RawPacket, Error> {
93        // Decode the header and check the allowable size
94        let header = FixedHeader::decode(dst, self.inbound_max_size)?;
95
96        let mut payload = dst.split_to(header.packet_len()).freeze();
97
98        // Skip the header data
99        payload.advance(header.fixed_len());
100
101        Ok(RawPacket::new(header, payload))
102    }
103}
104
105impl<T> Encoder<T> for PacketCodec
106where
107    T: Encode,
108{
109    type Error = Error;
110
111    /// Encodes an item into the provided buffer.
112    ///
113    /// # Examples
114    ///
115    /// ```rust
116    /// use mqute_codec::codec::{PacketCodec, Encode};
117    /// use tokio_util::codec::Encoder;
118    /// use bytes::BytesMut;
119    /// use mqute_codec::Error;
120    ///
121    /// struct MyPacket;
122    ///
123    /// impl Encode for MyPacket {
124    ///     fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> {
125    ///         dst.extend_from_slice(&[0x30, 0x02, 0x00, 0x01]);
126    ///         Ok(())
127    ///     }
128    ///
129    /// fn payload_len(&self) -> usize {
130    ///         4
131    ///     }
132    /// }
133    ///
134    /// let mut codec = PacketCodec::new(Some(1024), Some(1024));
135    /// let mut buffer = BytesMut::new();
136    /// let packet = MyPacket {};
137    /// codec.encode(packet, &mut buffer).unwrap();
138    ///
139    /// assert_eq!(buffer.as_ref(), &[0x30, 0x02, 0x00, 0x01]);
140    /// ```
141    fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
142        if let Some(max_size) = self.outbound_max_size {
143            if item.encoded_len() > max_size {
144                return Err(Error::OutgoingPayloadSizeLimitExceeded(item.encoded_len()));
145            }
146        }
147
148        item.encode(dst)
149    }
150}
151
152impl Decoder for PacketCodec {
153    type Item = RawPacket;
154    type Error = Error;
155
156    /// Decodes a raw packet from the provided buffer.
157    ///
158    /// # Examples
159    ///
160    /// ```rust
161    /// use mqute_codec::codec::PacketCodec;
162    /// use mqute_codec::protocol::PacketType;
163    /// use tokio_util::codec::Decoder;
164    /// use bytes::BytesMut;
165    ///
166    /// let mut codec = PacketCodec::new(Some(1024), Some(1024));
167    /// let mut buffer = BytesMut::from(&[0x30, 0x02, 0x00, 0x01][..]); // Example raw packet
168    /// let packet = codec.decode(&mut buffer).unwrap().unwrap();
169    ///
170    /// assert_eq!(packet.header.packet_type(), PacketType::Publish);
171    /// assert_eq!(packet.payload.len(), 2);
172    /// ```
173    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
174        match self.try_decode(src) {
175            Ok(packet) => Ok(Some(packet)),
176            Err(Error::NotEnoughBytes(len)) => {
177                // Get more packets to construct the incomplete packet
178                src.reserve(len);
179                Ok(None)
180            }
181            Err(e) => Err(e),
182        }
183    }
184}