rmqtt_codec/lib.rs
1#![deny(unsafe_code)]
2
3//! MQTT protocol codec implementation with multi-version support and version negotiation
4//!
5//! ## Core Features:
6//! - **Dual Protocol Support**: Full implementation of MQTT v3.1, v3.1.1 and v5.0 specifications
7//! - **Automatic Version Detection**: Handshake-based protocol negotiation during connection establishment
8//! - **Zero-Copy Encoding**: Efficient binary processing using `bytes::BytesMut` for network operations
9//! - **Tokio Integration**: Seamless compatibility with Tokio runtime via `tokio_util::codec`
10//! - **Memory Safety**: Strict enforcement of message size limits (1MB default) with configurable constraints
11//!
12//! ## Architecture Components:
13//! - `MqttCodec`: Main dispatcher handling version-specific encoding/decoding logic
14//! - `MqttPacket`: Unified representation of all protocol versions' packet types
15//! - `version::ProtocolVersion`: Detection mechanism for protocol handshake
16//! - Error handling with dedicated `EncodeError`/`DecodeError` types
17//!
18
19#[macro_use]
20mod utils;
21
22/// Error types for encoding/decoding operations
23pub mod error;
24
25/// Shared types and constants for MQTT protocol
26pub mod types;
27
28/// MQTT v3.1.1 protocol implementation
29pub mod v3;
30
31/// MQTT v5.0 protocol implementation
32pub mod v5;
33
34/// Protocol version detection and negotiation
35pub mod version;
36
37/// Main MQTT protocol codec implementation
38///
39/// Handles version negotiation and provides unified interface for:
40/// - MQTT v3.1.1
41/// - MQTT v5.0
42/// - Protocol version detection
43#[derive(Debug)]
44pub enum MqttCodec {
45 /// MQTT v3.1.1 codec
46 V3(v3::Codec),
47 /// MQTT v5.0 codec
48 V5(v5::Codec),
49 /// Protocol version detection codec (used during initial handshake)
50 Version(version::VersionCodec),
51}
52
53/// Decoded MQTT protocol packets
54///
55/// Represents all possible packet types across supported protocol versions
56/// plus version detection results during handshake
57#[derive(Debug)]
58pub enum MqttPacket {
59 /// MQTT v3.1.1 protocol packet
60 V3(v3::Packet),
61 /// MQTT v5.0 protocol packet
62 V5(v5::Packet),
63 /// Protocol version detection result
64 Version(version::ProtocolVersion),
65}
66
67impl tokio_util::codec::Encoder<MqttPacket> for MqttCodec {
68 type Error = error::EncodeError;
69
70 /// Encodes MQTT packets according to active protocol version
71 ///
72 /// # Example
73 /// ```
74 /// use bytes::BytesMut;
75 /// use rmqtt_codec::{MqttCodec, MqttPacket, v3};
76 /// use tokio_util::codec::Encoder;
77 ///
78 /// let mut codec = MqttCodec::V3(v3::Codec::new(1024*1024));
79 /// let mut buffer = BytesMut::new();
80 /// let packet = MqttPacket::V3(v3::Packet::PingRequest);
81 /// codec.encode(packet, &mut buffer).unwrap();
82 /// ```
83 #[inline]
84 fn encode(&mut self, item: MqttPacket, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
85 match self {
86 MqttCodec::V3(codec) => match item {
87 MqttPacket::V3(p) => {
88 codec.encode(p, dst)?;
89 }
90 _ => return Err(error::EncodeError::MalformedPacket),
91 },
92 MqttCodec::V5(codec) => match item {
93 MqttPacket::V5(p) => {
94 codec.encode(p, dst)?;
95 }
96 _ => return Err(error::EncodeError::MalformedPacket),
97 },
98 MqttCodec::Version(_) => return Err(error::EncodeError::UnsupportedVersion),
99 };
100 Ok(())
101 }
102}
103
104impl tokio_util::codec::Decoder for MqttCodec {
105 type Item = (MqttPacket, u32);
106 type Error = error::DecodeError;
107
108 /// Decodes network bytes into MQTT packets
109 ///
110 /// Returns tuple containing:
111 /// - Decoded packet
112 /// - Number of bytes consumed from input buffer
113 ///
114 /// # Example
115 /// ```
116 /// use bytes::{BytesMut, BufMut};
117 /// use rmqtt_codec::{MqttCodec, v3};
118 /// use tokio_util::codec::Decoder;
119 ///
120 /// let mut codec = MqttCodec::V3(v3::Codec::new(1024*1024));
121 /// let mut buffer = BytesMut::new();
122 /// buffer.put_slice(b"\x30\x00"); // Publish packet
123 /// let packet = codec.decode(&mut buffer);
124 /// ```
125 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
126 let p = match self {
127 MqttCodec::V3(codec) => codec.decode(src)?.map(|(p, remaining)| (MqttPacket::V3(p), remaining)),
128 MqttCodec::V5(codec) => codec.decode(src)?.map(|(p, remaining)| (MqttPacket::V5(p), remaining)),
129 MqttCodec::Version(codec) => codec.decode(src)?.map(|v| (MqttPacket::Version(v), 0)),
130 };
131 Ok(p)
132 }
133}