rmqtt_codec/lib.rs
1#![deny(unsafe_code)]
2
3//! MQTT protocol codec implementation with multi-version support and version negotiation
4//!
5//! ## Core Features:
6//! - **Dual Protocol Support**: Full implementation of MQTT v3.1, v3.1.1 and v5.0 specifications
7//! - **Automatic Version Detection**: Handshake-based protocol negotiation during connection establishment
8//! - **Zero-Copy Encoding**: Efficient binary processing using `bytes::BytesMut` for network operations
9//! - **Tokio Integration**: Seamless compatibility with Tokio runtime via `tokio_util::codec`
10//! - **Memory Safety**: Strict enforcement of message size limits (1MB default) with configurable constraints
11//!
12//! ## Architecture Components:
13//! - `MqttCodec`: Main dispatcher handling version-specific encoding/decoding logic
14//! - `MqttPacket`: Unified representation of all protocol versions' packet types
15//! - `version::ProtocolVersion`: Detection mechanism for protocol handshake
16//! - Error handling with dedicated `EncodeError`/`DecodeError` types
17//!
18
19#[macro_use]
20mod utils;
21
22/// Error types for encoding/decoding operations
23pub mod error;
24
25/// Shared types and constants for MQTT protocol
26pub mod types;
27
28/// MQTT v3.1.1 protocol implementation
29pub mod v3;
30
31/// MQTT v5.0 protocol implementation
32pub mod v5;
33
34pub mod cert;
35/// Protocol version detection and negotiation
36pub mod version;
37
38/// Main MQTT protocol codec implementation
39///
40/// Handles version negotiation and provides unified interface for:
41/// - MQTT v3.1.1
42/// - MQTT v5.0
43/// - Protocol version detection
44#[derive(Debug)]
45pub enum MqttCodec {
46 /// MQTT v3.1.1 codec
47 V3(v3::Codec),
48 /// MQTT v5.0 codec
49 V5(v5::Codec),
50 /// Protocol version detection codec (used during initial handshake)
51 Version(version::VersionCodec),
52}
53
54/// Decoded MQTT protocol packets
55///
56/// Represents all possible packet types across supported protocol versions
57/// plus version detection results during handshake
58#[derive(Debug)]
59pub enum MqttPacket {
60 /// MQTT v3.1.1 protocol packet
61 V3(v3::Packet),
62 /// MQTT v5.0 protocol packet
63 V5(v5::Packet),
64 /// Protocol version detection result
65 Version(version::ProtocolVersion),
66}
67
68impl tokio_util::codec::Encoder<MqttPacket> for MqttCodec {
69 type Error = error::EncodeError;
70
71 /// Encodes MQTT packets according to active protocol version
72 ///
73 /// # Example
74 /// ```
75 /// use bytes::BytesMut;
76 /// use rmqtt_codec::{MqttCodec, MqttPacket, v3};
77 /// use tokio_util::codec::Encoder;
78 ///
79 /// let mut codec = MqttCodec::V3(v3::Codec::new(1024*1024));
80 /// let mut buffer = BytesMut::new();
81 /// let packet = MqttPacket::V3(v3::Packet::PingRequest);
82 /// codec.encode(packet, &mut buffer).unwrap();
83 /// ```
84 #[inline]
85 fn encode(&mut self, item: MqttPacket, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
86 match self {
87 MqttCodec::V3(codec) => match item {
88 MqttPacket::V3(p) => {
89 codec.encode(p, dst)?;
90 }
91 _ => return Err(error::EncodeError::MalformedPacket),
92 },
93 MqttCodec::V5(codec) => match item {
94 MqttPacket::V5(p) => {
95 codec.encode(p, dst)?;
96 }
97 _ => return Err(error::EncodeError::MalformedPacket),
98 },
99 MqttCodec::Version(_) => return Err(error::EncodeError::UnsupportedVersion),
100 };
101 Ok(())
102 }
103}
104
105impl tokio_util::codec::Decoder for MqttCodec {
106 type Item = (MqttPacket, u32);
107 type Error = error::DecodeError;
108
109 /// Decodes network bytes into MQTT packets
110 ///
111 /// Returns tuple containing:
112 /// - Decoded packet
113 /// - Number of bytes consumed from input buffer
114 ///
115 /// # Example
116 /// ```
117 /// use bytes::{BytesMut, BufMut};
118 /// use rmqtt_codec::{MqttCodec, v3};
119 /// use tokio_util::codec::Decoder;
120 ///
121 /// let mut codec = MqttCodec::V3(v3::Codec::new(1024*1024));
122 /// let mut buffer = BytesMut::new();
123 /// buffer.put_slice(b"\x30\x00"); // Publish packet
124 /// let packet = codec.decode(&mut buffer);
125 /// ```
126 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
127 let p = match self {
128 MqttCodec::V3(codec) => codec.decode(src)?.map(|(p, remaining)| (MqttPacket::V3(p), remaining)),
129 MqttCodec::V5(codec) => codec.decode(src)?.map(|(p, remaining)| (MqttPacket::V5(p), remaining)),
130 MqttCodec::Version(codec) => codec.decode(src)?.map(|v| (MqttPacket::Version(v), 0)),
131 };
132 Ok(p)
133 }
134}