pub struct GenericConnection<Role, PacketIdType>where
Role: RoleType,
PacketIdType: IsPacketId,{ /* private fields */ }
Expand description
Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design. It handles MQTT packet processing, state management, and protocol compliance without performing any actual network I/O operations. Instead, it returns events that the application must handle.
§Type Parameters
Role
- The connection role (Client or Server), determining allowed operationsPacketIdType
- The type used for packet IDs (typicallyu16
, but can beu32
for extended scenarios)
§Key Features
- Sans-I/O Design: No network I/O is performed directly; events are returned for external handling
- Protocol Compliance: Implements MQTT v3.1.1 and v5.0 specifications
- State Management: Tracks connection state, packet IDs, and protocol flows
- Configurable Behavior: Supports various configuration options for different use cases
- Generic Packet ID Support: Can use u16 or u32 packet IDs for different deployment scenarios
§Usage
use mqtt_protocol_core::mqtt;
let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
// Send a packet
let events = connection.send(publish_packet);
for event in events {
match event {
mqtt::connection::Event::RequestSendPacket { packet, .. } => {
// Send packet over network
},
// Handle other events...
}
}
// Receive data
let mut cursor = std::io::Cursor::new(received_data);
let events = connection.recv(&mut cursor);
// Process events...
Implementations§
Source§impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>where
Role: RoleType,
PacketIdType: IsPacketId,
impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>where
Role: RoleType,
PacketIdType: IsPacketId,
Sourcepub fn new(version: Version) -> Self
pub fn new(version: Version) -> Self
Create a new MQTT connection instance
Initializes a new MQTT connection with the specified protocol version. The connection starts in a disconnected state and must be activated through the connection handshake process (CONNECT/CONNACK).
§Parameters
version
- The MQTT protocol version to use (V3_1_1 or V5_0)
§Returns
A new GenericConnection
instance ready for use
§Examples
use mqtt_protocol_core::mqtt;
let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
Sourcepub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>where
T: Sendable<Role, PacketIdType>,
pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>where
T: Sendable<Role, PacketIdType>,
Send MQTT packet with compile-time role checking (experimental)
This method provides compile-time verification that the packet being sent
is allowed for the current connection role (Client or Server). It only works
when the Role
type parameter is concrete (not generic).
This is an experimental API that may be subject to change. For general use,
consider using the send()
method instead.
§Type Parameters
T
- The packet type that must implementSendable<Role, PacketIdType>
§Parameters
packet
- The MQTT packet to send
§Returns
A vector of events that the application must process
§Compile-time Safety
If the packet type is not allowed for the current role, this will result in a compile-time error, preventing protocol violations at development time.
§Examples
use mqtt_protocol_core::mqtt;
// This works for concrete roles
let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
// This would cause a compile error
// let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
Sourcepub fn send(
&mut self,
packet: GenericPacket<PacketIdType>,
) -> Vec<GenericEvent<PacketIdType>>
pub fn send( &mut self, packet: GenericPacket<PacketIdType>, ) -> Vec<GenericEvent<PacketIdType>>
Send MQTT packet with runtime role validation
This is the primary method for sending MQTT packets. It accepts any GenericPacket
and performs runtime validation to ensure the packet is allowed for the current
connection role. This provides flexibility when the exact packet type is not known
at compile time.
§Parameters
packet
- The MQTT packet to send
§Returns
A vector of events that the application must process. If the packet is not allowed
for the current role, a NotifyError
event will be included.
§Validation
The method validates that:
- The packet type is allowed for the connection role (Client vs Server)
- Protocol version compatibility
- Connection state requirements
- Packet ID management for QoS > 0 packets
§Examples
use mqtt_protocol_core::mqtt;
let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
for event in events {
match event {
mqtt::connection::Event::RequestSendPacket { packet, .. } => {
// Send packet over network
},
mqtt::connection::Event::NotifyError(error) => {
// Handle validation errors
},
// Handle other events...
}
}
Sourcepub fn recv(
&mut self,
data: &mut Cursor<&[u8]>,
) -> Vec<GenericEvent<PacketIdType>>
pub fn recv( &mut self, data: &mut Cursor<&[u8]>, ) -> Vec<GenericEvent<PacketIdType>>
Receive and process incoming MQTT data
This method processes raw bytes received from the network and attempts to parse them into MQTT packets. It handles packet fragmentation and can process multiple complete packets from a single data buffer.
§Parameters
data
- A cursor over the received data bytes. The cursor position will be advanced as data is consumed.
§Returns
A vector of events generated from processing the received data:
NotifyPacketReceived
for successfully parsed packetsNotifyError
for parsing errors or protocol violations- Additional events based on packet processing (timers, responses, etc.)
§Behavior
- Handles partial packets (data will be buffered until complete)
- Processes multiple complete packets in sequence
- Validates packet structure and protocol compliance
- Updates internal connection state based on received packets
- Generates appropriate response events (ACKs, etc.)
§Examples
use mqtt_protocol_core::mqtt;
let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
let received_data = [/* network data */];
let mut cursor = std::io::Cursor::new(&received_data[..]);
let events = connection.recv(&mut cursor);
for event in events {
match event {
mqtt::connection::Event::NotifyPacketReceived(packet) => {
// Process received packet
},
mqtt::connection::Event::NotifyError(error) => {
// Handle parsing/protocol errors
},
// Handle other events...
}
}
Sourcepub fn notify_timer_fired(
&mut self,
kind: TimerKind,
) -> Vec<GenericEvent<PacketIdType>>
pub fn notify_timer_fired( &mut self, kind: TimerKind, ) -> Vec<GenericEvent<PacketIdType>>
Notify that a timer has fired (Event-based API)
This method should be called when the I/O layer detects that a timer has expired. It handles the timer event appropriately and returns events that need to be processed. Notify that a timer has fired
This method should be called by the application when a previously requested timer expires. The connection will take appropriate action based on the timer type.
§Parameters
kind
- The type of timer that fired
§Returns
Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
Sourcepub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>>
pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>>
Notify that the connection has been closed by the I/O layer (Event-based API)
This method should be called when the I/O layer detects that the socket has been closed. It updates the internal state appropriately and returns events that need to be processed. Notify that the underlying connection has been closed
This method should be called when the network connection is closed, either intentionally or due to network issues.
§Returns
Events generated from connection closure processing
Sourcepub fn set_pingreq_send_interval(
&mut self,
duration_ms: Option<u64>,
) -> Vec<GenericEvent<PacketIdType>>
pub fn set_pingreq_send_interval( &mut self, duration_ms: Option<u64>, ) -> Vec<GenericEvent<PacketIdType>>
Set the PINGREQ send interval
Overrides the interval for sending PINGREQ packets to maintain the connection alive. When changed, this may generate timer-related events to update the ping schedule.
PINGREQ is sent after the interval has elapsed since the last packet of any type was sent.
The send interval is determined by the following priority order:
- User setting by this function
- ServerKeepAlive property value in received CONNACK packet
- KeepAlive value in sent CONNECT packet
§Parameters
duration_ms
- The interval override setting:Some(value)
- Override with specified milliseconds. If 0, no PINGREQ is sent.None
- Do not override, use CONNACK ServerKeepAlive or CONNECT KeepAlive instead.
§Returns
Events generated from updating the ping interval
Sourcepub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16>
pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16>
Get the remaining capacity for sending PUBLISH packets
Returns the number of additional PUBLISH packets that can be sent without exceeding the receive maximum limit.
§Returns
The remaining capacity for outgoing PUBLISH packets, or None
if no limit is set
Sourcepub fn set_offline_publish(&mut self, enable: bool)
pub fn set_offline_publish(&mut self, enable: bool)
Enable or disable offline publishing
When enabled, PUBLISH packets can be sent even when disconnected. They will be queued and sent once the connection is established.
§Parameters
enable
- Whether to enable offline publishing
Sourcepub fn set_auto_pub_response(&mut self, enable: bool)
pub fn set_auto_pub_response(&mut self, enable: bool)
Enable or disable automatic PUBLISH response generation
When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.) are automatically generated for received PUBLISH packets.
§Parameters
enable
- Whether to enable automatic responses
Sourcepub fn set_auto_ping_response(&mut self, enable: bool)
pub fn set_auto_ping_response(&mut self, enable: bool)
Enable or disable automatic PING response generation
When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
§Parameters
enable
- Whether to enable automatic PING responses
Sourcepub fn set_auto_map_topic_alias_send(&mut self, enable: bool)
pub fn set_auto_map_topic_alias_send(&mut self, enable: bool)
Enable or disable automatic topic alias mapping for outgoing packets
When enabled, the connection will automatically map topics to aliases for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
- Applying existing registered topic aliases when available
- Allocating new topic aliases for unregistered topics
- Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
§Parameters
enable
- Whether to enable automatic topic alias mapping
Sourcepub fn set_auto_replace_topic_alias_send(&mut self, enable: bool)
pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool)
Enable or disable automatic topic alias replacement for outgoing packets
When enabled, the connection will automatically apply existing registered topic aliases to outgoing PUBLISH packets when aliases are available. This only uses previously registered aliases and does not allocate new ones.
§Parameters
enable
- Whether to enable automatic topic alias replacement
Sourcepub fn set_pingresp_recv_timeout(&mut self, timeout_ms: u64)
pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: u64)
Set the PINGRESP receive timeout
Sets the timeout for receiving PINGRESP packets after sending PINGREQ packets. If PINGRESP is not received within this timeout, the connection is considered disconnected.
§Parameters
timeout_ms
- The timeout in milliseconds. Set to 0 to disable timeout.
Sourcepub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError>
pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError>
Acquire a new packet ID for outgoing packets
§Returns
A unique packet ID, or an error if none are available
Sourcepub fn register_packet_id(
&mut self,
packet_id: PacketIdType,
) -> Result<(), MqttError>
pub fn register_packet_id( &mut self, packet_id: PacketIdType, ) -> Result<(), MqttError>
Sourcepub fn release_packet_id(
&mut self,
packet_id: PacketIdType,
) -> Vec<GenericEvent<PacketIdType>>
pub fn release_packet_id( &mut self, packet_id: PacketIdType, ) -> Vec<GenericEvent<PacketIdType>>
Sourcepub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType>
pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType>
Get the set of QoS 2 PUBLISH packet IDs that have been handled
Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets that have been successfully processed and handled.
§Returns
A HashSet
containing packet IDs of handled QoS 2 PUBLISH packets
Sourcepub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>)
pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>)
Restore the set of QoS 2 PUBLISH packet IDs that have been handled
Restores the internal state of handled QoS 2 PUBLISH packet IDs, typically used when resuming a connection from persistent storage.
§Parameters
pids
- AHashSet
containing packet IDs of previously handled QoS 2 PUBLISH packets
Sourcepub fn restore_packets(
&mut self,
packets: Vec<GenericStorePacket<PacketIdType>>,
)
pub fn restore_packets( &mut self, packets: Vec<GenericStorePacket<PacketIdType>>, )
Restore previously stored packets
This method restores packets that were previously stored for persistence, typically called when resuming a session.
§Parameters
packets
- Vector of packets to restore
Sourcepub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>>
pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>>
Get stored packets for persistence
Returns packets that need to be stored for potential retransmission. This is useful for implementing persistent sessions.
§Returns
Vector of packets that should be persisted
Sourcepub fn get_protocol_version(&self) -> Version
pub fn get_protocol_version(&self) -> Version
Sourcepub fn regulate_for_store(
&self,
packet: GenericPublish<PacketIdType>,
) -> Result<GenericPublish<PacketIdType>, MqttError>
pub fn regulate_for_store( &self, packet: GenericPublish<PacketIdType>, ) -> Result<GenericPublish<PacketIdType>, MqttError>
Regulate packet for store (remove/resolve topic alias)
This method prepares a V5.0 publish packet for storage by resolving topic aliases and removing TopicAlias properties to ensure the packet can be retransmitted correctly.