Skip to main content

rmqtt_codec/
lib.rs

1#![deny(unsafe_code)]
2
3//! MQTT protocol codec implementation with multi-version support and version negotiation
4//!
5//! ## Core Features:
6//! - **Dual Protocol Support**: Full implementation of MQTT v3.1, v3.1.1 and v5.0 specifications
7//! - **Automatic Version Detection**: Handshake-based protocol negotiation during connection establishment
8//! - **Zero-Copy Encoding**: Efficient binary processing using `bytes::BytesMut` for network operations
9//! - **Tokio Integration**: Seamless compatibility with Tokio runtime via `tokio_util::codec`
10//! - **Memory Safety**: Strict enforcement of message size limits (1MB default) with configurable constraints
11//!
12//! ## Architecture Components:
13//! - `MqttCodec`: Main dispatcher handling version-specific encoding/decoding logic
14//! - `MqttPacket`: Unified representation of all protocol versions' packet types
15//! - `version::ProtocolVersion`: Detection mechanism for protocol handshake
16//! - Error handling with dedicated `EncodeError`/`DecodeError` types
17//!
18
19#[macro_use]
20mod utils;
21
22/// Error types for encoding/decoding operations
23pub mod error;
24
25/// Shared types and constants for MQTT protocol
26pub mod types;
27
28/// MQTT v3.1.1 protocol implementation
29pub mod v3;
30
31/// MQTT v5.0 protocol implementation
32pub mod v5;
33
34pub mod cert;
35/// Protocol version detection and negotiation
36pub mod version;
37
38/// Main MQTT protocol codec implementation
39///
40/// Handles version negotiation and provides unified interface for:
41/// - MQTT v3.1.1
42/// - MQTT v5.0
43/// - Protocol version detection
44#[derive(Debug)]
45pub enum MqttCodec {
46    /// MQTT v3.1.1 codec
47    V3(v3::Codec),
48    /// MQTT v5.0 codec
49    V5(v5::Codec),
50    /// Protocol version detection codec (used during initial handshake)
51    Version(version::VersionCodec),
52}
53
54/// Decoded MQTT protocol packets
55///
56/// Represents all possible packet types across supported protocol versions
57/// plus version detection results during handshake
58#[derive(Debug)]
59pub enum MqttPacket {
60    /// MQTT v3.1.1 protocol packet
61    V3(v3::Packet),
62    /// MQTT v5.0 protocol packet
63    V5(v5::Packet),
64    /// Protocol version detection result
65    Version(version::ProtocolVersion),
66}
67
68impl tokio_util::codec::Encoder<MqttPacket> for MqttCodec {
69    type Error = error::EncodeError;
70
71    /// Encodes MQTT packets according to active protocol version
72    ///
73    /// # Example
74    /// ```
75    /// use bytes::BytesMut;
76    /// use rmqtt_codec::{MqttCodec, MqttPacket, v3};
77    /// use tokio_util::codec::Encoder;
78    ///
79    /// let mut codec = MqttCodec::V3(v3::Codec::new(1024*1024));
80    /// let mut buffer = BytesMut::new();
81    /// let packet = MqttPacket::V3(v3::Packet::PingRequest);
82    /// codec.encode(packet, &mut buffer).unwrap();
83    /// ```
84    #[inline]
85    fn encode(&mut self, item: MqttPacket, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
86        match self {
87            MqttCodec::V3(codec) => match item {
88                MqttPacket::V3(p) => {
89                    codec.encode(p, dst)?;
90                }
91                _ => return Err(error::EncodeError::MalformedPacket),
92            },
93            MqttCodec::V5(codec) => match item {
94                MqttPacket::V5(p) => {
95                    codec.encode(p, dst)?;
96                }
97                _ => return Err(error::EncodeError::MalformedPacket),
98            },
99            MqttCodec::Version(_) => return Err(error::EncodeError::UnsupportedVersion),
100        };
101        Ok(())
102    }
103}
104
105impl tokio_util::codec::Decoder for MqttCodec {
106    type Item = (MqttPacket, u32);
107    type Error = error::DecodeError;
108
109    /// Decodes network bytes into MQTT packets
110    ///
111    /// Returns tuple containing:
112    /// - Decoded packet
113    /// - Number of bytes consumed from input buffer
114    ///
115    /// # Example
116    /// ```
117    /// use bytes::{BytesMut, BufMut};
118    /// use rmqtt_codec::{MqttCodec, v3};
119    /// use tokio_util::codec::Decoder;
120    ///
121    /// let mut codec = MqttCodec::V3(v3::Codec::new(1024*1024));
122    /// let mut buffer = BytesMut::new();
123    /// buffer.put_slice(b"\x30\x00"); // Publish packet
124    /// let packet = codec.decode(&mut buffer);
125    /// ```
126    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
127        let p = match self {
128            MqttCodec::V3(codec) => codec.decode(src)?.map(|(p, remaining)| (MqttPacket::V3(p), remaining)),
129            MqttCodec::V5(codec) => codec.decode(src)?.map(|(p, remaining)| (MqttPacket::V5(p), remaining)),
130            MqttCodec::Version(codec) => codec.decode(src)?.map(|v| (MqttPacket::Version(v), 0)),
131        };
132        Ok(p)
133    }
134}