Expand description
A pure rust mqtt client which strives to be robust, efficient and easy to use.
- Eventloop is just an async
Stream
which can be polled by tokio - Requests to eventloop is also a
Stream
. Solves both bounded an unbounded usecases - Robustness just a loop away
- Flexible access to the state of eventloop to control its behaviour
§Accepts any stream of Requests
Build bounded, unbounded, interruptible or any other stream (that fits your need) to feed the eventloop.
Few of our real world use cases
- A stream which orchestrates data between disk and memory by detecting backpressure and never (practically) loose data
- A stream which juggles data between several channels based on priority of the data
ⓘ
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
}
§Robustness a loop away
Networks are unreliable. But robustness is easy
- Just create a new stream from the existing eventloop
- Resumes from where it left
- Access the state of the eventloop to customize the behaviour of the next connection
ⓘ
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
// loop to reconnect and resume
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
println!("Received = {:?}", item);
}
time::delay_for(Duration::from_secs(1)).await;
}
}
§Eventloop is just a stream which can be polled with tokio
- Plug it into
select!
join!
to interleave with other streams on the the same thread
ⓘ
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let requests = Vec::new::<Request>();
let mut eventloop = eventloop(mqttoptions, requests_rx);
// plug it into tokio ecosystem
let mut stream = eventloop.connect().await.unwrap();
}
§Powerful notification system to control the runtime
Eventloop stream yields all the interesting event ranging for data on the network to disconnections and reconnections. Use it the way you see fit
- Resubscribe after reconnection
- Stop after receiving Nth puback
ⓘ
#[tokio::main(core_threads = 1)]
async fn main() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let (requests_tx, requests_rx) = channel(10);
let mut eventloop = eventloop(mqttoptions, requests_rx);
// loop to reconnect and resume
loop {
let mut stream = eventloop.connect().await.unwrap();
while let Some(notification) = stream.next().await {
println!("Received = {:?}", item);
match notification {
Notification::Connect => requests_tx.send(subscribe).unwrap(),
}
}
time::delay_for(Duration::from_secs(1)).await;
}
}
Modules§
- codec
- This module describes how to serialize and deserialize mqtt 4 packets
Structs§
- Connack
- Connack packet
- Connect
- Mqtt connect packet representation
- Last
Will - Last will of the connection
- Mqtt
Event Loop - Complete state of the eventloop
- Mqtt
Options - Options to configure the behaviour of mqtt connection
- Mqtt
State - State of the mqtt connection.
- Packet
Identifier - Packet identifier for packets types that require broker to acknowledge
- Publish
- Publish packet
- Suback
- Subscription acknowledgement
- Subscribe
- Subscriber packet
- Subscribe
Topic - Subscription topic
- Unsubscribe
- Unsubscribe packet
Enums§
- Command
- Commands sent by the client to mqtt event loop. Commands
are of higher priority and will be
select
ed along with [request]s - Connect
Return Code - Connection return code sent by the server
- Event
Loop Error - Critical errors during eventloop polling
- Notification
- Includes incoming packets from the network and other interesting events happening in the eventloop
- Packet
- Encapsulates all the possible mqtt packets
- Protocol
- Mqtt protocol version
- QoS
- Quality of service
- Request
- Requests by the client to mqtt event loop. Request are handle one by one
- Security
Options - Client authentication option for mqtt connect packet
- Subscribe
Return Codes - Subscription return code sent by the broker
Traits§
- Async
Mqtt Read - Mqtt awareness on top of tokio’s
AsyncRead
- Async
Mqtt Write - Mqtt awareness on top of tokio’s
AsyncWrite
- Mqtt
Read - Mqtt awareness on top of
Read
- Mqtt
Write - Mqtt awareness on top of
Write
Functions§
- eventloop
- Returns an object which encompasses state of the connection.
Use this to create a
Stream
withstream()
method and poll it with tokio. - has_
wildcards - Checks if a topic or topic filter has wildcards
- matches
- Checks if topic matches a filter. topic and filter validation isn’t done here. note ‘topic’ is a misnomer in the arg. This can also be used to match 2 wild subscriptions. note Make sure a topic is validated during a publish and filter is validated during a subscribe
- valid_
filter - Checks if the filter is valid https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106
- valid_
topic - Checks if a topic is valid