mqtt5
Full-featured MQTT v5.0 and v3.1.1 client and broker for native platforms (Linux, macOS, Windows). This is the primary crate for building MQTT applications — it provides both the async client for connecting to any MQTT broker and a production-ready broker implementation with multi-transport support, authentication, and bridging. For browser environments, see the companion mqtt5-wasm crate.
Features
- MQTT v5.0 and v3.1.1 protocol support
- Multiple transports: TCP, TLS, WebSocket, QUIC
- QUIC multistream support with flow headers
- QUIC connection migration for mobile clients
- Automatic reconnection with exponential backoff
- Configurable keepalive with timeout tolerance
- Mock client for unit testing
- OpenTelemetry distributed tracing (optional feature)
Installation
[]
= "0.31"
Quick Start
Client
use ;
async
Broker
use ;
async
Broker
The broker handles the full MQTT v5.0 and v3.1.1 protocol with cross-version interoperability. It supports QoS 0, 1, and 2 with proper flow control, session persistence (clean start, session expiry, message queuing), retained messages, shared subscriptions for load balancing across clients, and will messages (Last Will and Testament).
Transport & Security
Four transport types can run simultaneously in a single broker instance. TCP serves standard MQTT on port 1883. TLS/SSL provides encrypted MQTT on port 8883 with certificate authentication and client certificate validation. WebSocket enables browser-based MQTT connections. QUIC offers a modern UDP-based transport with built-in TLS 1.3 and multistream support.
Authentication
The broker supports multiple authentication methods:
| Method | Description | Use Case |
|---|---|---|
| Password | Argon2id hashed passwords | Internal users |
| SCRAM-SHA-256 | Challenge-response, no password transmission | High security |
| JWT | Stateless token verification | Single IdP |
| Federated JWT | Multi-issuer with JWKS auto-refresh | Google, Keycloak, Azure AD |
use BrokerConfig;
use ;
let config = default
.with_auth;
Use CompositeAuthProvider to chain enhanced auth with a password fallback for internal service clients:
use ;
use Arc;
let primary = broker.auth_provider;
let fallback = new;
let broker = broker.with_auth_provider;
See Authentication & Authorization Guide for full details.
Advanced Features
Change-Only Delivery
Reduces bandwidth for topics that frequently publish unchanged values (common with sensors).
use ;
let config = new
.with_change_only_delivery;
How it works:
- Broker tracks last payload hash per topic per subscriber
- Messages only delivered when payload differs from last delivered value
- State persists across client reconnections
- Configured via topic patterns with wildcard support
Bridge behavior:
- Messages from bridges to local subscribers: change-only filtering applies
- Messages to bridges (outgoing): all messages forwarded (no filtering)
Load Balancer Mode
Redirects clients to backend brokers using MQTT v5 UseAnotherServer (0x9C) with consistent hashing on the client ID.
Clients connecting to port 1883 receive a redirect and automatically reconnect to the assigned backend (up to 3 hops).
use ;
let config = new
.with_load_balancer;
let mut broker = with_config.await?;
broker.run.await?;
Broker Bridging
Connect two brokers together for message forwarding:
use ;
use QoS;
let bridge_config = new
.add_topic
.add_topic
.add_topic;
Bridge Loop Prevention
When using bidirectional bridges or complex multi-broker topologies, message loops can occur. The broker automatically detects and prevents loops using SHA-256 message fingerprints:
- Each message's fingerprint (hash of topic + payload + QoS + retain) is cached
- Duplicate fingerprints within the TTL window are dropped
- Default TTL: 60 seconds, default cache size: 10,000 fingerprints
use Duration;
let mut config = new
.add_topic;
config.loop_prevention_ttl = from_secs;
config.loop_prevention_cache_size = 50000;
Or via JSON configuration:
Note: Legitimate duplicate messages (identical content sent within the TTL window) will also be blocked. Adjust TTL based on your use case.
Event Hooks
Monitor and react to broker events with custom handlers:
use BrokerConfig;
use ;
use Future;
use Pin;
use Arc;
;
let config = default
.with_event_handler;
Available hooks: on_client_connect, on_client_subscribe, on_client_unsubscribe, on_client_publish, on_client_disconnect, on_retained_set, on_message_delivered.
The ClientPublishEvent includes MQTT 5.0 request/response fields (response_topic, correlation_data) enabling event handlers to see where clients want responses sent and echo back correlation data.
Configuration
Multi-Transport Broker
use ;
use ;
async
Client
The client library provides an async MQTT client designed for both IoT devices and cloud applications.
Core Capabilities
The client speaks both MQTT v5.0 and v3.1.1, with callback-based message handling that automatically routes messages to registered handlers. Subscribe returns a (packet_id, qos) tuple for cloud SDK compatibility (AWS IoT, Azure IoT Hub). Automatic reconnection with exponential backoff keeps connections alive through network disruptions, while client-side message queuing buffers publishes during offline periods. The client validates broker responses and surfaces reason code validation for publish rejections (ACL denials, quota limits).
QUIC Transport
use MqttClient;
async
Connection Migration (QUIC)
use MqttClient;
let client = new;
client.connect.await?;
client.migrate.await?;
See the QUIC Transport Guide for stream strategies, flow headers, and configuration details.
AWS IoT
use ;
use Duration;
let client = new;
client.connect.await?;
let = client.subscribe.await?;
use NamespaceValidator;
let validator = aws_iot.with_device_id;
client.publish.await?;
AWS IoT features:
- AWS IoT endpoint detection
- Topic validation for AWS IoT restrictions and limits
- ALPN protocol support for AWS IoT
- Client certificate loading from bytes (PEM/DER formats)
- SDK compatibility: Subscribe method returns
(packet_id, qos)tuple
Testing with Mock Client
use ;
async
async
Keepalive Configuration
use ConnectOptions;
use KeepaliveConfig;
use Duration;
let options = new
.with_keep_alive
.with_keepalive_config;
KeepaliveConfig controls ping timing and timeout tolerance:
ping_interval_percent: when to send PINGREQ (default 75% of keep_alive)timeout_percent: how long to wait for PINGRESP (default 150%, use 200%+ for high-latency)
OpenTelemetry
Distributed tracing with OpenTelemetry support:
[]
= { = "0.31", = ["opentelemetry"] }
- W3C trace context propagation via MQTT user properties
- Span creation for publish/subscribe operations
- Bridge trace context forwarding
- Publisher-to-subscriber observability
use ;
use TelemetryConfig;
async
See crates/mqtt5/examples/broker_with_opentelemetry.rs for a complete example.
Transport URLs
| Transport | URL Format | Port |
|---|---|---|
| TCP | mqtt://host:port |
1883 |
| TLS | mqtts://host:port |
8883 |
| WebSocket | ws://host:port/path |
8080 |
| QUIC | quic://host:port |
14567 |
License
Licensed under either of Apache License, Version 2.0 or MIT license at your option.