[−][src]Crate rumqttc
A pure rust MQTT client which strives to be robust, efficient and easy to use. This library is backed by an async (tokio) eventloop which handles all the robustness and and efficiency parts of MQTT but naturally fits into both sync and async worlds as we'll see
Let's jump into examples right away
A simple synchronous publish and subscribe
use rumqttc::{MqttOptions, Client, QoS}; use std::time::Duration; use std::thread; fn main() { let mut mqttoptions = MqttOptions::new("rumqtt-sync-client", "test.mosquitto.org", 1883); mqttoptions.set_keep_alive(5); let (mut client, mut connection) = Client::new(mqttoptions, 10); client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap(); thread::spawn(move || for i in 0..10 { client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap(); thread::sleep(Duration::from_millis(100)); }); // Iterate to poll the eventloop for connection progress for (i, notification) in connection.iter().enumerate() { println!("Notification = {:?}", notification); } }
What's happening behind the scenes
- Eventloop orchestrates user requests and incoming packets concurrently and hadles the state
- Ping the broker when necessary and detects client side half open connections as well
- Throttling of outgoing packets
- Queue size based flow control on outgoing packets
- Automatic reconnections
- Natural backpressure to the client during slow network
In short, everything necessary to maintain a robust connection
NOTE: Looping on connection.iter()
is necessary to run the eventloop. It yields both
incoming and outgoing activity notifications which allows customization as user sees fit.
Blocking here will block connection progress
A simple asynchronous publish and subscribe
use rumqttc::{MqttOptions, Request, EventLoop}; use std::time::Duration; use std::error::Error; #[tokio::main(core_threads = 1)] async fn main() { let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883); let mut eventloop = EventLoop::new(mqttoptions, 10).await; let requests_tx = eventloop.handle(); loop { let notification = eventloop.poll().await.unwrap(); println!("Received = {:?}", notification); tokio::time::delay_for(Duration::from_secs(1)).await; } }
- Reconnects if polled again after an error
- User handle to send requests is just a channel
Since eventloop is externally polled (with iter()/poll()
) out side the library, users can
- Distribute incoming messages based on topics
- Stop it when required
- Access internal state for use cases like graceful shutdown
Structs
Client |
|
ConnAck | Acknowledgement to connect packet |
Connect | Connection packet initiated by the client |
Connection | MQTT connection. Maintains all the necessary state and automatically retries connections in flaky networks. |
Disconnect | |
EventLoop | Eventloop with all the state of a connection |
FixedHeader | Packet type from a byte |
LastWill | LastWill that broker forwards on behalf of the client |
MqttOptions | Options to configure the behaviour of mqtt connection |
MqttState | State of the mqtt connection. |
Network | Network transforms packets <-> frames efficiently. It takes advantage of pre-allocation, buffering and vectorization when appropriate to achieve performance |
PingReq | |
PingResp | |
PubAck | Acknowledgement to QoS1 publish |
PubComp | Acknowledgement to pubrel |
PubRec | Acknowledgement to QoS2 publish |
PubRel | Acknowledgement to pubrec |
Publish | Publish packet |
SubAck | Acknowledgement to subscribe |
Subscribe | Subscription packet |
SubscribeTopic | Subscription filter |
UnsubAck | Acknowledgement to unsubscribe |
Unsubscribe | Unsubscribe packet |
Enums
ClientError | Client Error |
ConnectReturnCode | Return code in connack |
ConnectionError | Critical errors during eventloop polling |
Error | |
Incoming | |
Key | Key type for TLS authentication |
Outgoing | Current outgoing activity on the eventloop |
Packet | Encapsulates all MQTT packet types |
PacketType | MQTT packet type |
Protocol | Protocol type |
QoS | Quality of service |
Request | Requests by the client to mqtt event loop. Request are handled one by one. This is a duplicate of possible MQTT packets along with the ability to tag data and do bulk operations. Upcoming feature: When 'manual' feature is turned on provides the ability to reply with acks when the user sees fit |
SecurityOptions | Client authentication option for mqtt connect packet |
StateError | |
SubscribeReturnCodes | Subscription return code |
Functions
check | Checks if the stream has enough bytes to frame a packet and returns fixed header |
has_wildcards | Checks if a topic or topic filter has wildcards |
matches | Checks if topic matches a filter. topic and filter validation isn't done here. |
mqtt_read | Reads a stream of bytes and extracts MQTT packets |
qos | Maps a number to QoS |
valid_filter | Checks if the filter is valid |
valid_topic | Checks if a topic is valid |