[][src]Crate rumqttc

A pure rust MQTT client which strives to be robust, efficient and easy to use. This library is backed by an async (tokio) eventloop which handles all the robustness and and efficiency parts of MQTT but naturally fits into both sync and async worlds as we'll see

Let's jump into examples right away

A simple synchronous publish and subscribe

use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;

fn main() {
    let mut mqttoptions = MqttOptions::new("rumqtt-sync-client", "test.mosquitto.org", 1883);
    mqttoptions.set_keep_alive(5);

    let (mut client, mut connection) = Client::new(mqttoptions, 10);
    client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
    thread::spawn(move || for i in 0..10 {
       client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
       thread::sleep(Duration::from_millis(100));
    });

    // Iterate to poll the eventloop for connection progress
    for (i, notification) in connection.iter().enumerate() {
        println!("Notification = {:?}", notification);
    }
}

What's happening behind the scenes

  • Eventloop orchestrates user requests and incoming packets concurrently and hadles the state
  • Ping the broker when necessary and detects client side half open connections as well
  • Throttling of outgoing packets
  • Queue size based flow control on outgoing packets
  • Automatic reconnections
  • Natural backpressure to the client during slow network

In short, everything necessary to maintain a robust connection

NOTE: Looping on connection.iter() is necessary to run the eventloop. It yields both incoming and outgoing activity notifications which allows customization as user sees fit. Blocking here will block connection progress

A simple asynchronous publish and subscribe

use rumqttc::{MqttOptions, Request, EventLoop};
use std::time::Duration;
use std::error::Error;

#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
    let mut eventloop = EventLoop::new(mqttoptions, 10).await;
    let requests_tx = eventloop.handle();

    loop {
        let notification = eventloop.poll().await.unwrap();
        println!("Received = {:?}", notification);
        tokio::time::delay_for(Duration::from_secs(1)).await;
    }
}
  • Reconnects if polled again after an error
  • User handle to send requests is just a channel

Since eventloop is externally polled (with iter()/poll()) out side the library, users can

  • Distribute incoming messages based on topics
  • Stop it when required
  • Access internal state for use cases like graceful shutdown

Structs

Client

Client to communicate with MQTT eventloop Connection.

ConnAck

Acknowledgement to connect packet

Connect

Connection packet initiated by the client

Connection

MQTT connection. Maintains all the necessary state and automatically retries connections in flaky networks.

Disconnect
EventLoop

Eventloop with all the state of a connection

FixedHeader

Packet type from a byte

LastWill

LastWill that broker forwards on behalf of the client

MqttOptions

Options to configure the behaviour of mqtt connection

MqttState

State of the mqtt connection.

Network

Network transforms packets <-> frames efficiently. It takes advantage of pre-allocation, buffering and vectorization when appropriate to achieve performance

PingReq
PingResp
PubAck

Acknowledgement to QoS1 publish

PubComp

Acknowledgement to pubrel

PubRec

Acknowledgement to QoS2 publish

PubRel

Acknowledgement to pubrec

Publish

Publish packet

SubAck

Acknowledgement to subscribe

Subscribe

Subscription packet

SubscribeTopic

Subscription filter

UnsubAck

Acknowledgement to unsubscribe

Unsubscribe

Unsubscribe packet

Enums

ClientError

Client Error

ConnectReturnCode

Return code in connack

ConnectionError

Critical errors during eventloop polling

Error
Incoming
Key

Key type for TLS authentication

Outgoing

Current outgoing activity on the eventloop

Packet

Encapsulates all MQTT packet types

PacketType

MQTT packet type

Protocol

Protocol type

QoS

Quality of service

Request

Requests by the client to mqtt event loop. Request are handled one by one. This is a duplicate of possible MQTT packets along with the ability to tag data and do bulk operations. Upcoming feature: When 'manual' feature is turned on provides the ability to reply with acks when the user sees fit

SecurityOptions

Client authentication option for mqtt connect packet

StateError
SubscribeReturnCodes

Subscription return code

Functions

check

Checks if the stream has enough bytes to frame a packet and returns fixed header

has_wildcards

Checks if a topic or topic filter has wildcards

matches

Checks if topic matches a filter. topic and filter validation isn't done here.

mqtt_read

Reads a stream of bytes and extracts MQTT packets

qos

Maps a number to QoS

valid_filter

Checks if the filter is valid

valid_topic

Checks if a topic is valid