[][src]Crate rumq_client

A pure rust mqtt client which strives to be robust, efficient and easy to use.

  • Eventloop is just an async Stream which can be polled by tokio
  • Requests to eventloop is also a Stream. Solves both bounded an unbounded usecases
  • Robustness just a loop away
  • Flexible access to the state of eventloop to control its behaviour

Accepts any stream of Requests

Build bounded, unbounded, interruptible or any other stream (that fits your need) to feed the eventloop.

Few of our real world use cases

  • A stream which orchestrates data between disk and memory by detecting backpressure and never (practically) loose data
  • A stream which juggles data between several channels based on priority of the data
This example is not tested
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);
    let mut stream = eventloop.connect().await.unwrap();
    while let Some(item) = stream.next().await {
        println!("Received = {:?}", item);
    }
}

Robustness a loop away

Networks are unreliable. But robustness is easy

  • Just create a new stream from the existing eventloop
  • Resumes from where it left
  • Access the state of the eventloop to customize the behaviour of the next connection
This example is not tested
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(item) = stream.next().await {
            println!("Received = {:?}", item);
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

Eventloop is just a stream which can be polled with tokio

  • Plug it into select! join! to interleave with other streams on the the same thread
This example is not tested
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // plug it into tokio ecosystem
    let mut stream = eventloop.connect().await.unwrap();
}

Powerful notification system to control the runtime

Eventloop stream yields all the interesting event ranging for data on the network to disconnections and reconnections. Use it the way you see fit

  • Resubscribe after reconnection
  • Stop after receiving Nth puback
This example is not tested
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let (requests_tx, requests_rx) = channel(10);

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(notification) = stream.next().await {
            println!("Received = {:?}", item);
            match notification {
                Notification::Connect => requests_tx.send(subscribe).unwrap(),
            }
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

Modules

codec

This module describes how to serialize and deserialize mqtt 4 packets

Structs

Connack

Connack packet

Connect

Mqtt connect packet representation

LastWill

Last will of the connection

MqttEventLoop

Complete state of the eventloop

MqttOptions

Options to configure the behaviour of mqtt connection

MqttState

State of the mqtt connection.

PacketIdentifier

Packet identifier for packets types that require broker to acknowledge

Publish

Publish packet

Suback

Subscription acknowledgement

Subscribe

Subscriber packet

SubscribeTopic

Subscription topic

Unsubscribe

Unsubscribe packet

Enums

Command

Commands sent by the client to mqtt event loop. Commands are of higher priority and will be selected along with [request]s

ConnectReturnCode

Connection return code sent by the server

EventLoopError

Critical errors during eventloop polling

Notification

Includes incoming packets from the network and other interesting events happening in the eventloop

Packet

Encapsulates all the possible mqtt packets

Protocol

Mqtt protocol version

QoS

Quality of service

Request

Requests by the client to mqtt event loop. Request are handle one by one

SecurityOptions

Client authentication option for mqtt connect packet

SubscribeReturnCodes

Subscription return code sent by the broker

Traits

AsyncMqttRead

Mqtt awareness on top of tokio's AsyncRead

AsyncMqttWrite

Mqtt awareness on top of tokio's AsyncWrite

MqttRead

Mqtt awareness on top of Read

MqttWrite

Mqtt awareness on top of Write

Functions

eventloop

Returns an object which encompasses state of the connection. Use this to create a Stream with stream() method and poll it with tokio.

has_wildcards

Checks if a topic or topic filter has wildcards

matches

Checks if topic matches a filter. topic and filter validation isn't done here. note 'topic' is a misnomer in the arg. This can also be used to match 2 wild subscriptions. note Make sure a topic is validated during a publish and filter is validated during a subscribe

valid_filter

Checks if the filter is valid https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106

valid_topic

Checks if a topic is valid