Expand description
Mosquitto is a popular MQTT broker implemented in C. Although there are pure Rust MQTT clients, it is still useful to have a binding to the Mosquitto client.
The basic story is that you connect to a broker, subscribing to topics that interest you and publishing messages on a particular topic. The messages may be any arbitrary bytes, but this implementation does require that the topics themselves be UTF-8. The C API is based on callbacks, which are mapped onto Rust closures. Everything starts with Mosquitto.
For example, publishing a message and confirming that it is sent:
let m = mosquitto_client::Mosquitto::new("test");
m.connect("localhost",1883)?;
// publish and get a message id
let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
// and wait for confirmation for that message id
let mut mc = m.callbacks(());
mc.on_publish(|_,mid| {
if mid == our_mid {
m.disconnect().unwrap();
}
});
// wait forever until explicit disconnect
// -1 means use default timeout on operations
m.loop_until_disconnect(-1)?;
Here we subscribe and listen to several topics:
let m = mosquitto_client::Mosquitto::new("test");
m.connect("localhost",1883)?;
let bonzo = m.subscribe("bonzo/#",0)?;
let frodo = m.subscribe("frodo/#",0)?;
let mut mc = m.callbacks(());
mc.on_message(|_,msg| {
if ! msg.retained() { // not interested in any retained messages!
if bonzo.matches(&msg) {
println!("bonzo {:?}",msg);
} else
if frodo.matches(&msg) {
println!("frodo {:?}",msg);
}
}
});
m.loop_forever(200)?;
You can always just do a regular match on the recevied topic name
from the MosqMessage topic
method.
The callbacks
method can be given a value, and the first argument of any
callback will be a mutable reference to that value (this avoids the usual
shenanigans involved with closures having mutable borrows)
use std::{thread,time};
let m = mosquitto_client::Mosquitto::new("test");
m.connect("localhost",1883)?;
m.subscribe("bilbo/#",1)?;
let mt = m.clone();
thread::spawn(move || {
let timeout = time::Duration::from_millis(500);
for i in 0..5 {
let msg = format!("hello #{}",i+1);
mt.publish("bilbo/baggins",msg.as_bytes(), 1, false).unwrap();
thread::sleep(timeout);
}
mt.disconnect().unwrap();
});
let mut mc = m.callbacks(Vec::new());
mc.on_message(|data,msg| {
data.push(msg.text().to_string());
});
m.loop_until_disconnect(200)?;
assert_eq!(mc.data.len(),5);
Modules§
Structs§
- Callbacks
- Handling mosquitto callbacks. This will pass a mutable reference to the contained data to the callbacks.
- Error
- Our Error type. Covers both regular Mosquitto errors and connection errors.
- Mosq
Message - A mosquitto message
- Mosquitto
- Mosquitto client
- Topic
Matcher - Matching subscription topics. Returned from Mosquitto::subscribe.
- Version
- Mosquitto version
Functions§
- version
- get version of the mosquitto client