[−][src]Crate rumqtt
Rumqtt is a pure rust mqtt client which strives to be robust, efficient and easy to use.
- Provides several reconnection options to automate reconnections
- All the network requests are done using channels and bad networks can be detected through back pressure
- Incoming notifications are delivered to the user through crossbeam channel
which provides a flexible
select!
macro - Clone the client to access mqtt eventloop from multiple threads
- Dynamically start and stop the network eventloop (Useful when you want the other network services to have more bandwidth)
- Inbuilt support for connecting to gcloud iot core which uses jwt tokens as password to authenticate the client
- Inbuilt support fot throttling (Saas IOT providers usually impose rate limiting)
- Http
connect
support to tunnel mqtt data through http proxy servers
Publish and subscribe
use rumqtt::{MqttClient, MqttOptions, QoS}; use std::{thread, time::Duration}; fn main() { let mqtt_options = MqttOptions::new("test-pubsub1", "localhost", 1883); let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); let sleep_time = Duration::from_secs(1); thread::spawn(move || { for i in 0..100 { let payload = format!("publish {}", i); thread::sleep(sleep_time); mqtt_client.publish("hello/world", QoS::AtLeastOnce, false, payload).unwrap(); } }); for notification in notifications { println!("{:?}", notification) } }
Select on incoming notifications using crossbeam select!
use rumqtt::{MqttClient, MqttOptions, QoS}; use std::{thread, time::Duration}; use crossbeam_channel::select; fn main() { let mqtt_options = MqttOptions::new("test-pubsub1", "localhost", 1883); let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); let (done_tx, done_rx) = crossbeam_channel::bounded(1); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); let sleep_time = Duration::from_secs(1); thread::spawn(move || { for i in 0..100 { let payload = format!("publish {}", i); thread::sleep(sleep_time); mqtt_client.publish("hello/world", QoS::AtLeastOnce, false, payload).unwrap(); } thread::sleep(sleep_time * 10); done_tx.send(true).unwrap(); }); // select between mqtt notifications and other channel rx loop { select! { recv(notifications) -> notification => { println!("{:?}", notification) } recv(done_rx) -> _done => break } } }
Clone the client to access mqtt eventloop from different threads
use rumqtt::{MqttClient, MqttOptions, QoS}; use std::{thread, time::Duration}; fn main() { let mqtt_options = MqttOptions::new("test-pubsub1", "localhost", 1883); let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); let mut c1 = mqtt_client.clone(); let mut c2 = mqtt_client.clone(); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); let sleep_time = Duration::from_secs(1); thread::spawn(move || { for i in 0..100 { let payload = format!("publish {}", i); thread::sleep(sleep_time); c1.publish("hello/world", QoS::AtLeastOnce, false, payload).unwrap(); } }); // pause and resume network from this thread thread::spawn(move || { let dur = Duration::new(5, 0); for i in 0..100 { if i % 2 == 0 { c2.pause().unwrap() } else { c2.resume().unwrap() } thread::sleep(dur); } }); // receive incoming notifications for notification in notifications { println!("{:?}", notification) } }
Re-exports
pub use crate::client::MqttClient; |
pub use crate::client::Notification; |
pub use crate::mqttoptions::ConnectionMethod; |
pub use crate::mqttoptions::MqttOptions; |
pub use crate::mqttoptions::Proxy; |
pub use crate::mqttoptions::ReconnectOptions; |
pub use crate::mqttoptions::SecurityOptions; |
pub use crate::error::ConnectError; |
pub use crate::error::ClientError; |
Modules
client | Structs to interact with mqtt eventloop |
codec | Codec to convert incoming bytes of a tcp stream into mqtt packets and outgoing mqtt packets to raw bytes |
error | All errors |
mqttoptions | Options to set mqtt client behaviour |
Structs
Receiver | The receiving side of a channel. |