Crate mqtt5

Source
Expand description

§Complete MQTT v5.0 Platform

A complete MQTT v5.0 platform providing both high-performance async client library and full-featured broker implementation. Features include certificate loading from bytes, multi-transport support (TCP, TLS, WebSocket), authentication, bridging, and comprehensive testing.

§Architecture

This library uses Rust’s native async/await patterns throughout:

  • Direct async methods for all operations
  • Background async tasks for continuous operations (packet reading, keepalive)
  • The Tokio runtime for task scheduling

For architectural details, see ARCHITECTURE.md.

§Quick Start

use mqtt5::{MqttClient, ConnectOptions, QoS};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = MqttClient::new("test-client");
     
    // Direct async connect
    client.connect("mqtt://test.mosquitto.org:1883").await?;
     
    // Direct async subscribe with callback
    client.subscribe("sensors/+/data", |msg| {
        println!("Received {} on {}",
                 String::from_utf8_lossy(&msg.payload),
                 msg.topic);
    }).await?;
     
    // Direct async publish
    client.publish("sensors/temp/data", b"25.5").await?;
     
    Ok(())
}

§Advanced Example

use mqtt5::{MqttClient, ConnectOptions, PublishOptions, QoS, ConnectionEvent};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure connection options
    let options = ConnectOptions::new("weather-station")
        .with_clean_start(false)  // Resume previous session
        .with_keep_alive(Duration::from_secs(30))
        .with_automatic_reconnect(true)
        .with_reconnect_delay(Duration::from_secs(5), Duration::from_secs(60));
     
    let client = MqttClient::with_options(options);
     
    // Monitor connection events
    client.on_connection_event(|event| {
        match event {
            ConnectionEvent::Connected { session_present } => {
                println!("Connected! Session present: {}", session_present);
            }
            ConnectionEvent::Disconnected { reason } => {
                println!("Disconnected: {:?}", reason);
            }
            ConnectionEvent::Reconnecting { attempt } => {
                println!("Reconnecting... attempt {}", attempt);
            }
            ConnectionEvent::ReconnectFailed { error } => {
                println!("Reconnection failed: {}", error);
            }
        }
    }).await?;
     
    // Connect to broker
    client.connect("mqtts://broker.example.com:8883").await?;
     
    // Subscribe with QoS 2 for critical data
    client.subscribe("weather/+/alerts", |msg| {
        if msg.retain {
            println!("Retained alert: {}", String::from_utf8_lossy(&msg.payload));
        } else {
            println!("New alert: {}", String::from_utf8_lossy(&msg.payload));
        }
    }).await?;
     
    // Publish with custom options
    let mut pub_opts = PublishOptions::default();
    pub_opts.qos = QoS::ExactlyOnce;
    pub_opts.retain = true;
    pub_opts.properties.message_expiry_interval = Some(3600); // 1 hour
     
    client.publish_with_options(
        "weather/station01/temperature",
        b"25.5",
        pub_opts
    ).await?;
     
    // Keep running
    tokio::time::sleep(Duration::from_secs(3600)).await;
     
    Ok(())
}

§Broker Example

This library also provides a complete MQTT broker implementation:

use mqtt5::broker::{BrokerConfig, MqttBroker};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {  
    // Create a basic broker
    let mut broker = MqttBroker::bind("0.0.0.0:1883").await?;
     
    println!("🚀 MQTT broker running on port 1883");
     
    // Run until shutdown
    broker.run().await?;
    Ok(())
}

§Advanced Broker with Multi-Transport

use mqtt5::broker::{BrokerConfig, TlsConfig, WebSocketConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = BrokerConfig::default()
        // TCP on port 1883
        .with_bind_address("0.0.0.0:1883".parse()?)
        // TLS on port 8883
        .with_tls(
            TlsConfig::new("certs/server.crt".into(), "certs/server.key".into())
                .with_bind_address("0.0.0.0:8883".parse()?)
        )
        // WebSocket on port 8080
        .with_websocket(
            WebSocketConfig::default()
                .with_bind_address("0.0.0.0:8080".parse()?)
                .with_path("/mqtt")
        );

    let mut broker = MqttBroker::with_config(config).await?;
     
    println!("🚀 Multi-transport MQTT broker running");
    println!("  📡 TCP:       mqtt://localhost:1883");
    println!("  🔒 TLS:       mqtts://localhost:8883");  
    println!("  🌐 WebSocket: ws://localhost:8080/mqtt");
     
    broker.run().await?;
    Ok(())
}

Re-exports§

pub use client::ConnectionEvent;
pub use client::DisconnectReason;
pub use client::MockCall;
pub use client::MockMqttClient;
pub use client::MqttClient;
pub use client::MqttClientTrait;
pub use error::MqttError;
pub use error::Result;
pub use packet::publish::PublishPacket;
pub use packet::FixedHeader;
pub use packet::Packet;
pub use packet::PacketType;
pub use protocol::v5::properties::Properties;
pub use protocol::v5::properties::PropertyId;
pub use protocol::v5::properties::PropertyValue;
pub use protocol::v5::properties::PropertyValueType;
pub use types::ConnectOptions;
pub use types::ConnectProperties;
pub use types::ConnectResult;
pub use types::ConnectionStats;
pub use types::Message;
pub use types::MessageProperties;
pub use types::PublishOptions;
pub use types::PublishProperties;
pub use types::PublishResult;
pub use types::RetainHandling;
pub use types::SubscribeOptions;
pub use types::WillMessage;
pub use types::WillProperties;
pub use validation::is_valid_client_id;
pub use validation::is_valid_topic_filter;
pub use validation::is_valid_topic_name;
pub use validation::topic_matches_filter;
pub use validation::validate_client_id;
pub use validation::validate_topic_filter;
pub use validation::validate_topic_name;
pub use validation::RestrictiveValidator;
pub use validation::StandardValidator;
pub use validation::TopicValidator;

Modules§

broker
MQTT v5.0 Broker Implementation
callback
client
MQTT v5.0 Client - Direct Async Implementation
constants
MQTT Protocol Constants
encoding
error
flags
MQTT packet flag definitions using BeBytes v2.1.0 flag decomposition
packet
packet_id
Packet ID generation for MQTT
protocol
session
tasks
Background async tasks for MQTT client
test_utils
Test utilities for the MQTT library
topic_matching
transport
types
validation

Macros§

test_timeout
Helper macro for async tests with timeout

Enums§

QoS