mqtt5 0.31.1

Complete MQTT v5.0 platform with high-performance async client and full-featured broker supporting TCP, TLS, WebSocket, authentication, bridging, and resource monitoring
Documentation

mqtt5

Full-featured MQTT v5.0 and v3.1.1 client and broker for native platforms (Linux, macOS, Windows). This is the primary crate for building MQTT applications — it provides both the async client for connecting to any MQTT broker and a production-ready broker implementation with multi-transport support, authentication, and bridging. For browser environments, see the companion mqtt5-wasm crate.

Features

  • MQTT v5.0 and v3.1.1 protocol support
  • Multiple transports: TCP, TLS, WebSocket, QUIC
  • QUIC multistream support with flow headers
  • QUIC connection migration for mobile clients
  • Automatic reconnection with exponential backoff
  • Configurable keepalive with timeout tolerance
  • Mock client for unit testing
  • OpenTelemetry distributed tracing (optional feature)

Installation

[dependencies]
mqtt5 = "0.31"

Quick Start

Client

use mqtt5::{MqttClient, ConnectOptions};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = MqttClient::new("client-id");
    let options = ConnectOptions::new("client-id".to_string());

    client.connect_with_options("mqtt://localhost:1883", options).await?;
    client.publish("topic", b"message").await?;
    client.disconnect().await?;

    Ok(())
}

Broker

use mqtt5::broker::{MqttBroker, BrokerConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut broker = MqttBroker::bind("0.0.0.0:1883").await?;
    broker.run().await?;
    Ok(())
}

Broker

The broker handles the full MQTT v5.0 and v3.1.1 protocol with cross-version interoperability. It supports QoS 0, 1, and 2 with proper flow control, session persistence (clean start, session expiry, message queuing), retained messages, shared subscriptions for load balancing across clients, and will messages (Last Will and Testament).

Transport & Security

Four transport types can run simultaneously in a single broker instance. TCP serves standard MQTT on port 1883. TLS/SSL provides encrypted MQTT on port 8883 with certificate authentication and client certificate validation. WebSocket enables browser-based MQTT connections. QUIC offers a modern UDP-based transport with built-in TLS 1.3 and multistream support.

Authentication

The broker supports multiple authentication methods:

Method Description Use Case
Password Argon2id hashed passwords Internal users
SCRAM-SHA-256 Challenge-response, no password transmission High security
JWT Stateless token verification Single IdP
Federated JWT Multi-issuer with JWKS auto-refresh Google, Keycloak, Azure AD
use mqtt5::broker::BrokerConfig;
use mqtt5::broker::config::{AuthConfig, AuthMethod};

let config = BrokerConfig::default()
    .with_auth(AuthConfig {
        allow_anonymous: false,
        password_file: Some("passwd.txt".into()),
        auth_method: AuthMethod::Password,
        ..Default::default()
    });

Use CompositeAuthProvider to chain enhanced auth with a password fallback for internal service clients:

use mqtt5::broker::auth::{CompositeAuthProvider, PasswordAuthProvider};
use std::sync::Arc;

let primary = broker.auth_provider();
let fallback = Arc::new(PasswordAuthProvider::new());
let broker = broker.with_auth_provider(Arc::new(CompositeAuthProvider::new(primary, fallback)));

See Authentication & Authorization Guide for full details.

Advanced Features

Change-Only Delivery

Reduces bandwidth for topics that frequently publish unchanged values (common with sensors).

use mqtt5::broker::config::{BrokerConfig, ChangeOnlyDeliveryConfig};

let config = BrokerConfig::new()
    .with_change_only_delivery(ChangeOnlyDeliveryConfig {
        enabled: true,
        topic_patterns: vec!["sensors/#".to_string(), "status/+".to_string()],
    });

How it works:

  • Broker tracks last payload hash per topic per subscriber
  • Messages only delivered when payload differs from last delivered value
  • State persists across client reconnections
  • Configured via topic patterns with wildcard support

Bridge behavior:

  • Messages from bridges to local subscribers: change-only filtering applies
  • Messages to bridges (outgoing): all messages forwarded (no filtering)

Load Balancer Mode

Redirects clients to backend brokers using MQTT v5 UseAnotherServer (0x9C) with consistent hashing on the client ID.

mqttv5 broker --host 0.0.0.0:1884 --allow-anonymous
mqttv5 broker --host 0.0.0.0:1885 --allow-anonymous

mqttv5 broker --config lb.json
{
  "bind_addresses": ["0.0.0.0:1883"],
  "load_balancer": {
    "backends": ["mqtt://127.0.0.1:1884", "mqtt://127.0.0.1:1885"]
  }
}

Clients connecting to port 1883 receive a redirect and automatically reconnect to the assigned backend (up to 3 hops).

use mqtt5::broker::{MqttBroker, config::{BrokerConfig, LoadBalancerConfig}};

let config = BrokerConfig::new()
    .with_load_balancer(LoadBalancerConfig::new(vec![
        "mqtt://backend1:1883".into(),
        "mqtt://backend2:1883".into(),
    ]));

let mut broker = MqttBroker::with_config(config).await?;
broker.run().await?;

Broker Bridging

Connect two brokers together for message forwarding:

use mqtt5::broker::bridge::{BridgeConfig, BridgeDirection};
use mqtt5::QoS;

let bridge_config = BridgeConfig::new("edge-to-cloud", "cloud-broker:1883")
    .add_topic("sensors/+/data", BridgeDirection::Out, QoS::AtLeastOnce)
    .add_topic("commands/+/device", BridgeDirection::In, QoS::AtLeastOnce)
    .add_topic("health/+/status", BridgeDirection::Both, QoS::AtMostOnce);

Bridge Loop Prevention

When using bidirectional bridges or complex multi-broker topologies, message loops can occur. The broker automatically detects and prevents loops using SHA-256 message fingerprints:

  • Each message's fingerprint (hash of topic + payload + QoS + retain) is cached
  • Duplicate fingerprints within the TTL window are dropped
  • Default TTL: 60 seconds, default cache size: 10,000 fingerprints
use std::time::Duration;

let mut config = BridgeConfig::new("my-bridge", "remote:1883")
    .add_topic("data/#", BridgeDirection::Both, QoS::AtLeastOnce);

config.loop_prevention_ttl = Duration::from_secs(300);
config.loop_prevention_cache_size = 50000;

Or via JSON configuration:

{
  "name": "my-bridge",
  "remote_address": "remote-broker:1883",
  "loop_prevention_ttl": "5m",
  "loop_prevention_cache_size": 50000,
  "topics": [...]
}

Note: Legitimate duplicate messages (identical content sent within the TTL window) will also be blocked. Adjust TTL based on your use case.

Event Hooks

Monitor and react to broker events with custom handlers:

use mqtt5::broker::BrokerConfig;
use mqtt5::broker::events::{
    BrokerEventHandler, ClientConnectEvent, ClientPublishEvent, ClientDisconnectEvent,
    PublishAction,
};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

struct MetricsHandler;

impl BrokerEventHandler for MetricsHandler {
    fn on_client_connect<'a>(
        &'a self,
        event: ClientConnectEvent,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
        Box::pin(async move {
            println!("Client connected: {}", event.client_id);
        })
    }

    fn on_client_publish<'a>(
        &'a self,
        event: ClientPublishEvent,
    ) -> Pin<Box<dyn Future<Output = PublishAction> + Send + 'a>> {
        Box::pin(async move {
            println!("Message published to {}: {} bytes", event.topic, event.payload.len());
            if let Some(response_topic) = &event.response_topic {
                println!("  Response topic: {response_topic}");
            }
            if let Some(correlation_data) = &event.correlation_data {
                println!("  Correlation data: {} bytes", correlation_data.len());
            }
            PublishAction::Continue
        })
    }

    fn on_client_disconnect<'a>(
        &'a self,
        event: ClientDisconnectEvent,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
        Box::pin(async move {
            println!("Client disconnected: {} (reason: {:?})", event.client_id, event.reason);
        })
    }
}

let config = BrokerConfig::default()
    .with_event_handler(Arc::new(MetricsHandler));

Available hooks: on_client_connect, on_client_subscribe, on_client_unsubscribe, on_client_publish, on_client_disconnect, on_retained_set, on_message_delivered.

The ClientPublishEvent includes MQTT 5.0 request/response fields (response_topic, correlation_data) enabling event handlers to see where clients want responses sent and echo back correlation data.

Configuration

Multi-Transport Broker

use mqtt5::broker::{BrokerConfig, MqttBroker};
use mqtt5::broker::config::{TlsConfig, WebSocketConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = BrokerConfig::default()
        .with_bind_address("0.0.0.0:1883".parse()?)
        .with_tls(
            TlsConfig::new("certs/server.crt".into(), "certs/server.key".into())
                .with_ca_file("certs/ca.crt".into())
                .with_bind_address("0.0.0.0:8883".parse()?)
        )
        .with_websocket(
            WebSocketConfig::default()
                .with_bind_address("0.0.0.0:8080".parse()?)
                .with_path("/mqtt")
        )
        .with_max_clients(10_000);

    let mut broker = MqttBroker::with_config(config).await?;

    println!("Multi-transport MQTT broker running:");
    println!("  TCP:       mqtt://localhost:1883");
    println!("  TLS:       mqtts://localhost:8883");
    println!("  WebSocket: ws://localhost:8080/mqtt");
    println!("  QUIC:      quic://localhost:14567");

    broker.run().await?;
    Ok(())
}

Client

The client library provides an async MQTT client designed for both IoT devices and cloud applications.

Core Capabilities

The client speaks both MQTT v5.0 and v3.1.1, with callback-based message handling that automatically routes messages to registered handlers. Subscribe returns a (packet_id, qos) tuple for cloud SDK compatibility (AWS IoT, Azure IoT Hub). Automatic reconnection with exponential backoff keeps connections alive through network disruptions, while client-side message queuing buffers publishes during offline periods. The client validates broker responses and surfaces reason code validation for publish rejections (ACL denials, quota limits).

QUIC Transport

use mqtt5::MqttClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = MqttClient::new("quic-client");

    client.connect("quic://broker.example.com:14567").await?;
    client.publish("sensors/temp", b"25.5").await?;
    client.disconnect().await?;

    Ok(())
}

Connection Migration (QUIC)

use mqtt5::MqttClient;

let client = MqttClient::new("mobile-client");
client.connect("quic://broker.example.com:14567").await?;

client.migrate().await?;

See the QUIC Transport Guide for stream strategies, flow headers, and configuration details.

AWS IoT

use mqtt5::{MqttClient, ConnectOptions};
use std::time::Duration;

let client = MqttClient::new("aws-iot-device-12345");

client.connect("mqtts://abcdef123456.iot.us-east-1.amazonaws.com:8883").await?;

let (packet_id, qos) = client.subscribe("$aws/things/device-123/shadow/update/accepted", |msg| {
    println!("Shadow update accepted: {:?}", msg.payload);
}).await?;

use mqtt5::validation::namespace::NamespaceValidator;

let validator = NamespaceValidator::aws_iot().with_device_id("device-123");

client.publish("$aws/things/device-123/shadow/update", shadow_data).await?;

AWS IoT features:

  • AWS IoT endpoint detection
  • Topic validation for AWS IoT restrictions and limits
  • ALPN protocol support for AWS IoT
  • Client certificate loading from bytes (PEM/DER formats)
  • SDK compatibility: Subscribe method returns (packet_id, qos) tuple

Testing with Mock Client

use mqtt5::{MockMqttClient, MqttClientTrait, PublishResult, QoS};

#[tokio::test]
async fn test_my_iot_function() {
    let mock = MockMqttClient::new("test-device");

    mock.set_connect_response(Ok(())).await;
    mock.set_publish_response(Ok(PublishResult::QoS1Or2 { packet_id: 123 })).await;

    my_iot_function(&mock).await.unwrap();

    let calls = mock.get_calls().await;
    assert_eq!(calls.len(), 2);
}

async fn my_iot_function<T: MqttClientTrait>(client: &T) -> Result<(), Box<dyn std::error::Error>> {
    client.connect("mqtt://broker").await?;
    client.publish_qos1("telemetry", b"data").await?;
    Ok(())
}

Keepalive Configuration

use mqtt5::ConnectOptions;
use mqtt5::types::KeepaliveConfig;
use std::time::Duration;

let options = ConnectOptions::new("client-id")
    .with_keep_alive(Duration::from_secs(30))
    .with_keepalive_config(KeepaliveConfig::new(75, 200));

KeepaliveConfig controls ping timing and timeout tolerance:

  • ping_interval_percent: when to send PINGREQ (default 75% of keep_alive)
  • timeout_percent: how long to wait for PINGRESP (default 150%, use 200%+ for high-latency)

OpenTelemetry

Distributed tracing with OpenTelemetry support:

[dependencies]
mqtt5 = { version = "0.31", features = ["opentelemetry"] }
  • W3C trace context propagation via MQTT user properties
  • Span creation for publish/subscribe operations
  • Bridge trace context forwarding
  • Publisher-to-subscriber observability
use mqtt5::broker::{BrokerConfig, MqttBroker};
use mqtt5::telemetry::TelemetryConfig;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let telemetry_config = TelemetryConfig::new("mqtt-broker")
        .with_endpoint("http://localhost:4317")
        .with_sampling_ratio(1.0);

    let config = BrokerConfig::default()
        .with_bind_address(([127, 0, 0, 1], 1883))
        .with_opentelemetry(telemetry_config);

    let mut broker = MqttBroker::with_config(config).await?;
    broker.run().await?;
    Ok(())
}

See crates/mqtt5/examples/broker_with_opentelemetry.rs for a complete example.

Transport URLs

Transport URL Format Port
TCP mqtt://host:port 1883
TLS mqtts://host:port 8883
WebSocket ws://host:port/path 8080
QUIC quic://host:port 14567

License

Licensed under either of Apache License, Version 2.0 or MIT license at your option.