Crate mqtt5

Crate mqtt5 

Source
Expand description

§Complete MQTT v5.0 Platform

A complete MQTT v5.0 platform providing both high-performance async client library and full-featured broker implementation. Features include certificate loading from bytes, multi-transport support (TCP, TLS, WebSocket), authentication, bridging, and comprehensive testing.

§Architecture

This library uses Rust’s native async/await patterns throughout:

  • Direct async methods for all operations
  • Background async tasks for continuous operations (packet reading, keepalive)
  • The Tokio runtime for task scheduling

For architectural details, see ARCHITECTURE.md.

§Quick Start

use mqtt5::{MqttClient, ConnectOptions, QoS};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = MqttClient::new("test-client");
     
    // Direct async connect
    client.connect("mqtt://test.mosquitto.org:1883").await?;
     
    // Direct async subscribe with callback
    client.subscribe("sensors/+/data", |msg| {
        println!("Received {} on {}",
                 String::from_utf8_lossy(&msg.payload),
                 msg.topic);
    }).await?;
     
    // Direct async publish
    client.publish("sensors/temp/data", b"25.5").await?;
     
    Ok(())
}

§Advanced Example

use mqtt5::{MqttClient, ConnectOptions, PublishOptions, QoS, ConnectionEvent};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure connection options
    let options = ConnectOptions::new("weather-station")
        .with_clean_start(false)  // Resume previous session
        .with_keep_alive(Duration::from_secs(30))
        .with_automatic_reconnect(true)
        .with_reconnect_delay(Duration::from_secs(5), Duration::from_secs(60));
     
    let client = MqttClient::with_options(options);
     
    // Monitor connection events
    client.on_connection_event(|event| {
        match event {
            ConnectionEvent::Connected { session_present } => {
                println!("Connected! Session present: {}", session_present);
            }
            ConnectionEvent::Disconnected { reason } => {
                println!("Disconnected: {:?}", reason);
            }
            ConnectionEvent::Reconnecting { attempt } => {
                println!("Reconnecting... attempt {}", attempt);
            }
            ConnectionEvent::ReconnectFailed { error } => {
                println!("Reconnection failed: {}", error);
            }
        }
    }).await?;
     
    // Connect to broker
    client.connect("mqtts://broker.example.com:8883").await?;
     
    // Subscribe with QoS 2 for critical data
    client.subscribe("weather/+/alerts", |msg| {
        if msg.retain {
            println!("Retained alert: {}", String::from_utf8_lossy(&msg.payload));
        } else {
            println!("New alert: {}", String::from_utf8_lossy(&msg.payload));
        }
    }).await?;
     
    // Publish with custom options
    let mut pub_opts = PublishOptions::default();
    pub_opts.qos = QoS::ExactlyOnce;
    pub_opts.retain = true;
    pub_opts.properties.message_expiry_interval = Some(3600); // 1 hour
     
    client.publish_with_options(
        "weather/station01/temperature",
        b"25.5",
        pub_opts
    ).await?;
     
    // Keep running
    tokio::time::sleep(Duration::from_secs(3600)).await;
     
    Ok(())
}

§Broker Example

This library also provides a complete MQTT broker implementation:

use mqtt5::broker::{BrokerConfig, MqttBroker};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {  
    // Create a basic broker
    let mut broker = MqttBroker::bind("0.0.0.0:1883").await?;
     
    println!("🚀 MQTT broker running on port 1883");
     
    // Run until shutdown
    broker.run().await?;
    Ok(())
}

§Advanced Broker with Multi-Transport

use mqtt5::broker::{BrokerConfig, MqttBroker};
use mqtt5::broker::config::{TlsConfig, WebSocketConfig};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = BrokerConfig::default()
        // TCP on port 1883
        .with_bind_address("0.0.0.0:1883".parse::<SocketAddr>()?)
        // TLS on port 8883
        .with_tls(
            TlsConfig::new("certs/server.crt".into(), "certs/server.key".into())
                .with_bind_address("0.0.0.0:8883".parse::<SocketAddr>()?)
        )
        // WebSocket on port 8080
        .with_websocket(
            WebSocketConfig::default()
                .with_bind_address("0.0.0.0:8080".parse::<SocketAddr>()?)
                .with_path("/mqtt")
        );

    let mut broker = MqttBroker::with_config(config).await?;
     
    println!("🚀 Multi-transport MQTT broker running");
    println!("  📡 TCP:       mqtt://localhost:1883");
    println!("  🔒 TLS:       mqtts://localhost:8883");  
    println!("  🌐 WebSocket: ws://localhost:8080/mqtt");
     
    broker.run().await?;
    Ok(())
}

Re-exports§

pub use client::ConnectionEvent;
pub use client::DisconnectReason;
pub use client::MockCall;
pub use client::MockMqttClient;
pub use client::MqttClient;
pub use client::MqttClientTrait;
pub use types::ConnectOptions;
pub use types::ConnectionStats;

Modules§

broker
MQTT v5.0 Broker Implementation
callback
client
MQTT v5.0 Client - Direct Async Implementation
constants
MQTT Protocol Constants
encoding
error
flags
MQTT packet flag definitions using BeBytes v2.1.0 flag decomposition
packet
packet_id
Packet ID generation for MQTT
protocol
qos2
session
tasks
Background async tasks for MQTT client
telemetry
test_utils
Test utilities for the MQTT library
time
topic_matching
transport
types
validation

Macros§

test_timeout
Helper macro for async tests with timeout

Structs§

ConnectProperties
ConnectResult
FixedHeader
MQTT packet fixed header
Message
MessageProperties
Properties
Container for MQTT v5.0 properties
PublishOptions
PublishProperties
RestrictiveValidator
Restrictive validator with additional constraints
StandardValidator
Standard MQTT specification validator
SubscribeOptions
WillMessage
WillProperties

Enums§

MqttError
MQTT protocol errors
Packet
Enum representing all MQTT packet types
PacketType
PropertyId
MQTT v5.0 Property Identifiers
PropertyValue
Property value storage
PropertyValueType
Property value types
PublishResult
QoS
RetainHandling

Traits§

TopicValidator
Trait for pluggable topic validation
Transport

Functions§

is_valid_client_id
Validates an MQTT client identifier according to MQTT v5.0 specification
is_valid_topic_filter
Validates an MQTT topic filter according to MQTT v5.0 specification
is_valid_topic_name
Validates an MQTT topic name according to MQTT v5.0 specification
topic_matches_filter
Checks if a topic name matches a topic filter
validate_client_id
Validates a client ID and returns an error if invalid
validate_topic_filter
Validates a topic filter and returns an error if invalid
validate_topic_name
Validates a topic name and returns an error if invalid

Type Aliases§

Result