Expand description
§Complete MQTT v5.0 Platform
A complete MQTT v5.0 platform providing both high-performance async client library and full-featured broker implementation. Features include certificate loading from bytes, multi-transport support (TCP, TLS, WebSocket), authentication, bridging, and comprehensive testing.
§Architecture
This library uses Rust’s native async/await patterns throughout:
- Direct async methods for all operations
- Background async tasks for continuous operations (packet reading, keepalive)
- The Tokio runtime for task scheduling
For architectural details, see ARCHITECTURE.md.
§Quick Start
use mqtt5::{MqttClient, ConnectOptions, QoS};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = MqttClient::new("test-client");
// Direct async connect
client.connect("mqtt://test.mosquitto.org:1883").await?;
// Direct async subscribe with callback
client.subscribe("sensors/+/data", |msg| {
println!("Received {} on {}",
String::from_utf8_lossy(&msg.payload),
msg.topic);
}).await?;
// Direct async publish
client.publish("sensors/temp/data", b"25.5").await?;
Ok(())
}
§Advanced Example
use mqtt5::{MqttClient, ConnectOptions, PublishOptions, QoS, ConnectionEvent};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure connection options
let options = ConnectOptions::new("weather-station")
.with_clean_start(false) // Resume previous session
.with_keep_alive(Duration::from_secs(30))
.with_automatic_reconnect(true)
.with_reconnect_delay(Duration::from_secs(5), Duration::from_secs(60));
let client = MqttClient::with_options(options);
// Monitor connection events
client.on_connection_event(|event| {
match event {
ConnectionEvent::Connected { session_present } => {
println!("Connected! Session present: {}", session_present);
}
ConnectionEvent::Disconnected { reason } => {
println!("Disconnected: {:?}", reason);
}
ConnectionEvent::Reconnecting { attempt } => {
println!("Reconnecting... attempt {}", attempt);
}
ConnectionEvent::ReconnectFailed { error } => {
println!("Reconnection failed: {}", error);
}
}
}).await?;
// Connect to broker
client.connect("mqtts://broker.example.com:8883").await?;
// Subscribe with QoS 2 for critical data
client.subscribe("weather/+/alerts", |msg| {
if msg.retain {
println!("Retained alert: {}", String::from_utf8_lossy(&msg.payload));
} else {
println!("New alert: {}", String::from_utf8_lossy(&msg.payload));
}
}).await?;
// Publish with custom options
let mut pub_opts = PublishOptions::default();
pub_opts.qos = QoS::ExactlyOnce;
pub_opts.retain = true;
pub_opts.properties.message_expiry_interval = Some(3600); // 1 hour
client.publish_with_options(
"weather/station01/temperature",
b"25.5",
pub_opts
).await?;
// Keep running
tokio::time::sleep(Duration::from_secs(3600)).await;
Ok(())
}
§Broker Example
This library also provides a complete MQTT broker implementation:
use mqtt5::broker::{BrokerConfig, MqttBroker};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a basic broker
let mut broker = MqttBroker::bind("0.0.0.0:1883").await?;
println!("🚀 MQTT broker running on port 1883");
// Run until shutdown
broker.run().await?;
Ok(())
}
§Advanced Broker with Multi-Transport
use mqtt5::broker::{BrokerConfig, TlsConfig, WebSocketConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = BrokerConfig::default()
// TCP on port 1883
.with_bind_address("0.0.0.0:1883".parse()?)
// TLS on port 8883
.with_tls(
TlsConfig::new("certs/server.crt".into(), "certs/server.key".into())
.with_bind_address("0.0.0.0:8883".parse()?)
)
// WebSocket on port 8080
.with_websocket(
WebSocketConfig::default()
.with_bind_address("0.0.0.0:8080".parse()?)
.with_path("/mqtt")
);
let mut broker = MqttBroker::with_config(config).await?;
println!("🚀 Multi-transport MQTT broker running");
println!(" 📡 TCP: mqtt://localhost:1883");
println!(" 🔒 TLS: mqtts://localhost:8883");
println!(" 🌐 WebSocket: ws://localhost:8080/mqtt");
broker.run().await?;
Ok(())
}
Re-exports§
pub use client::ConnectionEvent;
pub use client::DisconnectReason;
pub use client::MockCall;
pub use client::MockMqttClient;
pub use client::MqttClient;
pub use client::MqttClientTrait;
pub use error::MqttError;
pub use error::Result;
pub use packet::publish::PublishPacket;
pub use packet::FixedHeader;
pub use packet::Packet;
pub use packet::PacketType;
pub use protocol::v5::properties::Properties;
pub use protocol::v5::properties::PropertyId;
pub use protocol::v5::properties::PropertyValue;
pub use protocol::v5::properties::PropertyValueType;
pub use types::ConnectOptions;
pub use types::ConnectProperties;
pub use types::ConnectResult;
pub use types::ConnectionStats;
pub use types::Message;
pub use types::MessageProperties;
pub use types::PublishOptions;
pub use types::PublishProperties;
pub use types::PublishResult;
pub use types::RetainHandling;
pub use types::SubscribeOptions;
pub use types::WillMessage;
pub use types::WillProperties;
pub use validation::is_valid_client_id;
pub use validation::is_valid_topic_filter;
pub use validation::is_valid_topic_name;
pub use validation::topic_matches_filter;
pub use validation::validate_client_id;
pub use validation::validate_topic_filter;
pub use validation::validate_topic_name;
pub use validation::RestrictiveValidator;
pub use validation::StandardValidator;
pub use validation::TopicValidator;
Modules§
- broker
- MQTT v5.0 Broker Implementation
- callback
- client
- MQTT v5.0 Client - Direct Async Implementation
- constants
- MQTT Protocol Constants
- encoding
- error
- flags
- MQTT packet flag definitions using
BeBytes
v2.1.0 flag decomposition - packet
- packet_
id - Packet ID generation for MQTT
- protocol
- session
- tasks
- Background async tasks for MQTT client
- test_
utils - Test utilities for the MQTT library
- topic_
matching - transport
- types
- validation
Macros§
- test_
timeout - Helper macro for async tests with timeout