Expand description
§Complete MQTT v5.0 Platform
A complete MQTT v5.0 platform providing both high-performance async client library and full-featured broker implementation. Features include certificate loading from bytes, multi-transport support (TCP, TLS, WebSocket), authentication, bridging, and comprehensive testing.
§Architecture
This library uses Rust’s native async/await patterns throughout:
- Direct async methods for all operations
- Background async tasks for continuous operations (packet reading, keepalive)
- The Tokio runtime for task scheduling
For architectural details, see ARCHITECTURE.md.
§Quick Start
use mqtt5::{MqttClient, ConnectOptions, QoS};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = MqttClient::new("test-client");
// Direct async connect
client.connect("mqtt://test.mosquitto.org:1883").await?;
// Direct async subscribe with callback
client.subscribe("sensors/+/data", |msg| {
println!("Received {} on {}",
String::from_utf8_lossy(&msg.payload),
msg.topic);
}).await?;
// Direct async publish
client.publish("sensors/temp/data", b"25.5").await?;
Ok(())
}§Advanced Example
use mqtt5::{MqttClient, ConnectOptions, PublishOptions, QoS, ConnectionEvent};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure connection options
let options = ConnectOptions::new("weather-station")
.with_clean_start(false) // Resume previous session
.with_keep_alive(Duration::from_secs(30))
.with_automatic_reconnect(true)
.with_reconnect_delay(Duration::from_secs(5), Duration::from_secs(60));
let client = MqttClient::with_options(options);
// Monitor connection events
client.on_connection_event(|event| {
match event {
ConnectionEvent::Connected { session_present } => {
println!("Connected! Session present: {}", session_present);
}
ConnectionEvent::Disconnected { reason } => {
println!("Disconnected: {:?}", reason);
}
ConnectionEvent::Reconnecting { attempt } => {
println!("Reconnecting... attempt {}", attempt);
}
ConnectionEvent::ReconnectFailed { error } => {
println!("Reconnection failed: {}", error);
}
}
}).await?;
// Connect to broker
client.connect("mqtts://broker.example.com:8883").await?;
// Subscribe with QoS 2 for critical data
client.subscribe("weather/+/alerts", |msg| {
if msg.retain {
println!("Retained alert: {}", String::from_utf8_lossy(&msg.payload));
} else {
println!("New alert: {}", String::from_utf8_lossy(&msg.payload));
}
}).await?;
// Publish with custom options
let mut pub_opts = PublishOptions::default();
pub_opts.qos = QoS::ExactlyOnce;
pub_opts.retain = true;
pub_opts.properties.message_expiry_interval = Some(3600); // 1 hour
client.publish_with_options(
"weather/station01/temperature",
b"25.5",
pub_opts
).await?;
// Keep running
tokio::time::sleep(Duration::from_secs(3600)).await;
Ok(())
}§Broker Example
This library also provides a complete MQTT broker implementation:
use mqtt5::broker::{BrokerConfig, MqttBroker};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a basic broker
let mut broker = MqttBroker::bind("0.0.0.0:1883").await?;
println!("🚀 MQTT broker running on port 1883");
// Run until shutdown
broker.run().await?;
Ok(())
}§Advanced Broker with Multi-Transport
use mqtt5::broker::{BrokerConfig, MqttBroker};
use mqtt5::broker::config::{TlsConfig, WebSocketConfig};
use std::net::SocketAddr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = BrokerConfig::default()
// TCP on port 1883
.with_bind_address("0.0.0.0:1883".parse::<SocketAddr>()?)
// TLS on port 8883
.with_tls(
TlsConfig::new("certs/server.crt".into(), "certs/server.key".into())
.with_bind_address("0.0.0.0:8883".parse::<SocketAddr>()?)
)
// WebSocket on port 8080
.with_websocket(
WebSocketConfig::default()
.with_bind_address("0.0.0.0:8080".parse::<SocketAddr>()?)
.with_path("/mqtt")
);
let mut broker = MqttBroker::with_config(config).await?;
println!("🚀 Multi-transport MQTT broker running");
println!(" 📡 TCP: mqtt://localhost:1883");
println!(" 🔒 TLS: mqtts://localhost:8883");
println!(" 🌐 WebSocket: ws://localhost:8080/mqtt");
broker.run().await?;
Ok(())
}Re-exports§
pub use client::ConnectionEvent;pub use client::DisconnectReason;pub use client::MockCall;pub use client::MockMqttClient;pub use client::MqttClient;pub use client::MqttClientTrait;pub use types::ConnectOptions;pub use types::ConnectionStats;
Modules§
- broker
- MQTT v5.0 Broker Implementation
- callback
- client
- MQTT v5.0 Client - Direct Async Implementation
- constants
- MQTT Protocol Constants
- encoding
- error
- flags
- MQTT packet flag definitions using
BeBytesv2.1.0 flag decomposition - packet
- packet_
id - Packet ID generation for MQTT
- protocol
- qos2
- session
- tasks
- Background async tasks for MQTT client
- telemetry
- test_
utils - Test utilities for the MQTT library
- time
- topic_
matching - transport
- types
- validation
Macros§
- test_
timeout - Helper macro for async tests with timeout
Structs§
- Connect
Properties - Connect
Result - Fixed
Header - MQTT packet fixed header
- Message
- Message
Properties - Properties
- Container for MQTT v5.0 properties
- Publish
Options - Publish
Properties - Restrictive
Validator - Restrictive validator with additional constraints
- Standard
Validator - Standard MQTT specification validator
- Subscribe
Options - Will
Message - Will
Properties
Enums§
- Mqtt
Error - MQTT protocol errors
- Packet
- Enum representing all MQTT packet types
- Packet
Type - Property
Id - MQTT v5.0 Property Identifiers
- Property
Value - Property value storage
- Property
Value Type - Property value types
- Publish
Result - QoS
- Retain
Handling
Traits§
- Topic
Validator - Trait for pluggable topic validation
- Transport
Functions§
- is_
valid_ client_ id - Validates an MQTT client identifier according to MQTT v5.0 specification
- is_
valid_ topic_ filter - Validates an MQTT topic filter according to MQTT v5.0 specification
- is_
valid_ topic_ name - Validates an MQTT topic name according to MQTT v5.0 specification
- topic_
matches_ filter - Checks if a topic name matches a topic filter
- validate_
client_ id - Validates a client ID and returns an error if invalid
- validate_
topic_ filter - Validates a topic filter and returns an error if invalid
- validate_
topic_ name - Validates a topic name and returns an error if invalid