pub struct Mosquitto { /* private fields */ }Expand description
Mosquitto client
Implementations§
Source§impl Mosquitto
impl Mosquitto
Sourcepub fn new(id: &str) -> Mosquitto
pub fn new(id: &str) -> Mosquitto
create a new mosquitto instance, providing a client name. Clients connecting to a broker must have unique names
Examples found in repository?
More examples
6fn go() -> Result<(),Box<Error>> {
7 let m = Mosquitto::new("test");
8
9 m.connect("localhost",1883)?;
10
11 // publish and get a message id
12 let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
13
14 // and wait for confirmation for that message id
15 let mut mc = m.callbacks(());
16 mc.on_publish(|_,mid| {
17 if mid == our_mid {
18 m.disconnect().unwrap();
19 }
20 });
21
22 // wait forever until explicit disconnect
23 m.loop_until_disconnect(-1)?;
24 Ok(())
25}5fn run() -> mosq::Result<()> {
6 let m = Mosquitto::new("test");
7
8 m.connect_wait("localhost",1883,300)?;
9 let bilbo = m.subscribe("bilbo/#",1)?;
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 for i in 0..5 {
14 let topic = format!("bilbo/{}",10*(i+1));
15 let data = format!("hello #{}",i);
16 mt.publish(&topic,data.as_bytes(), 1, false).unwrap();
17 }
18 });
19
20 let msgs = bilbo.receive_many(300)?;
21 for msg in msgs {
22 println!("topic {} text '{}'",msg.topic(),msg.text());
23 }
24 Ok(())
25}4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn new_session(id: &str, clean_session: bool) -> Mosquitto
pub fn new_session(id: &str, clean_session: bool) -> Mosquitto
create a new mosquitto instance with specified clean session flag. Clients connecting to a broker must have unique names
Sourcepub fn callbacks<'a, T>(&'a self, data: T) -> Callbacks<'a, T>
pub fn callbacks<'a, T>(&'a self, data: T) -> Callbacks<'a, T>
create a Callback object so you can listen to events.
Examples found in repository?
6fn go() -> Result<(),Box<Error>> {
7 let m = Mosquitto::new("test");
8
9 m.connect("localhost",1883)?;
10
11 // publish and get a message id
12 let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
13
14 // and wait for confirmation for that message id
15 let mut mc = m.callbacks(());
16 mc.on_publish(|_,mid| {
17 if mid == our_mid {
18 m.disconnect().unwrap();
19 }
20 });
21
22 // wait forever until explicit disconnect
23 m.loop_until_disconnect(-1)?;
24 Ok(())
25}More examples
4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn connect(&self, host: &str, port: u32) -> Result<()>
pub fn connect(&self, host: &str, port: u32) -> Result<()>
connect to the broker. You can only be fully sure that a connection succeeds after the on_connect callback returns non-zero
Examples found in repository?
6fn go() -> Result<(),Box<Error>> {
7 let m = Mosquitto::new("test");
8
9 m.connect("localhost",1883)?;
10
11 // publish and get a message id
12 let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
13
14 // and wait for confirmation for that message id
15 let mut mc = m.callbacks(());
16 mc.on_publish(|_,mid| {
17 if mid == our_mid {
18 m.disconnect().unwrap();
19 }
20 });
21
22 // wait forever until explicit disconnect
23 m.loop_until_disconnect(-1)?;
24 Ok(())
25}More examples
4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn connect_wait(&self, host: &str, port: u32, millis: i32) -> Result<()>
pub fn connect_wait(&self, host: &str, port: u32, millis: i32) -> Result<()>
connect to the broker, waiting for success.
Examples found in repository?
More examples
5fn run() -> mosq::Result<()> {
6 let m = Mosquitto::new("test");
7
8 m.connect_wait("localhost",1883,300)?;
9 let bilbo = m.subscribe("bilbo/#",1)?;
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 for i in 0..5 {
14 let topic = format!("bilbo/{}",10*(i+1));
15 let data = format!("hello #{}",i);
16 mt.publish(&topic,data.as_bytes(), 1, false).unwrap();
17 }
18 });
19
20 let msgs = bilbo.receive_many(300)?;
21 for msg in msgs {
22 println!("topic {} text '{}'",msg.topic(),msg.text());
23 }
24 Ok(())
25}pub fn reconnect_delay_set( &self, delay: u32, delay_max: u32, exponential_backoff: bool, ) -> Result<()>
Sourcepub fn subscribe<'a>(&'a self, sub: &str, qos: u32) -> Result<TopicMatcher<'a>>
pub fn subscribe<'a>(&'a self, sub: &str, qos: u32) -> Result<TopicMatcher<'a>>
subscribe to an MQTT topic, with a desired quality-of-service.
The returned value can be used to directly match
against received messages, and has a mid field identifying
the subscribing request. on_subscribe will be called with this
identifier.
Examples found in repository?
5fn run() -> mosq::Result<()> {
6 let m = Mosquitto::new("test");
7
8 m.connect_wait("localhost",1883,300)?;
9 let bilbo = m.subscribe("bilbo/#",1)?;
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 for i in 0..5 {
14 let topic = format!("bilbo/{}",10*(i+1));
15 let data = format!("hello #{}",i);
16 mt.publish(&topic,data.as_bytes(), 1, false).unwrap();
17 }
18 });
19
20 let msgs = bilbo.receive_many(300)?;
21 for msg in msgs {
22 println!("topic {} text '{}'",msg.topic(),msg.text());
23 }
24 Ok(())
25}More examples
4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn unsubscribe(&self, sub: &str) -> Result<i32>
pub fn unsubscribe(&self, sub: &str) -> Result<i32>
unsubcribe from an MQTT topic - on_unsubscribe callback will be called.
Sourcepub fn publish(
&self,
topic: &str,
payload: &[u8],
qos: u32,
retain: bool,
) -> Result<i32>
pub fn publish( &self, topic: &str, payload: &[u8], qos: u32, retain: bool, ) -> Result<i32>
publish an MQTT message to the broker, returning message id.
Quality-of-service and whether retained can be specified.
To be sure, check the message id passed to the on_publish callback
Examples found in repository?
6fn go() -> Result<(),Box<Error>> {
7 let m = Mosquitto::new("test");
8
9 m.connect("localhost",1883)?;
10
11 // publish and get a message id
12 let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
13
14 // and wait for confirmation for that message id
15 let mut mc = m.callbacks(());
16 mc.on_publish(|_,mid| {
17 if mid == our_mid {
18 m.disconnect().unwrap();
19 }
20 });
21
22 // wait forever until explicit disconnect
23 m.loop_until_disconnect(-1)?;
24 Ok(())
25}More examples
5fn run() -> mosq::Result<()> {
6 let m = Mosquitto::new("test");
7
8 m.connect_wait("localhost",1883,300)?;
9 let bilbo = m.subscribe("bilbo/#",1)?;
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 for i in 0..5 {
14 let topic = format!("bilbo/{}",10*(i+1));
15 let data = format!("hello #{}",i);
16 mt.publish(&topic,data.as_bytes(), 1, false).unwrap();
17 }
18 });
19
20 let msgs = bilbo.receive_many(300)?;
21 for msg in msgs {
22 println!("topic {} text '{}'",msg.topic(),msg.text());
23 }
24 Ok(())
25}5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn will_set(
&self,
topic: &str,
payload: &[u8],
qos: u32,
retain: bool,
) -> Result<()>
pub fn will_set( &self, topic: &str, payload: &[u8], qos: u32, retain: bool, ) -> Result<()>
Examples found in repository?
4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}pub fn will_clear(&self) -> Result<()>
Sourcepub fn publish_wait(
&self,
topic: &str,
payload: &[u8],
qos: u32,
retain: bool,
millis: i32,
) -> Result<i32>
pub fn publish_wait( &self, topic: &str, payload: &[u8], qos: u32, retain: bool, millis: i32, ) -> Result<i32>
publish an MQTT message to the broker, returning message id after waiting for successful publish
Sourcepub fn disconnect(&self) -> Result<()>
pub fn disconnect(&self) -> Result<()>
explicitly disconnect from the broker.
Examples found in repository?
6fn go() -> Result<(),Box<Error>> {
7 let m = Mosquitto::new("test");
8
9 m.connect("localhost",1883)?;
10
11 // publish and get a message id
12 let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
13
14 // and wait for confirmation for that message id
15 let mut mc = m.callbacks(());
16 mc.on_publish(|_,mid| {
17 if mid == our_mid {
18 m.disconnect().unwrap();
19 }
20 });
21
22 // wait forever until explicit disconnect
23 m.loop_until_disconnect(-1)?;
24 Ok(())
25}More examples
4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn do_loop(&self, timeout: i32) -> Result<()>
pub fn do_loop(&self, timeout: i32) -> Result<()>
process network events for at most timeout milliseconds.
-1 will mean the default, 1000ms.
Sourcepub fn loop_forever(&self, timeout: i32) -> Result<()>
pub fn loop_forever(&self, timeout: i32) -> Result<()>
process network events. This will handle intermittent disconnects for you, but will return after an explicit disconnect() call
Examples found in repository?
4fn main() {
5 let m = Mosquitto::new("test");
6
7 m.will_set("test/will",b"finished!",0,false).expect("can't set will");
8
9 m.connect("localhost",1883).expect("can't connect");
10 let bonzo = m.subscribe("bonzo/#",0).expect("can't subscribe to bonzo");
11 let frodo = m.subscribe("frodo/#",0).expect("can't subscribe to frodo");
12
13 // not interested in any retained messages!
14 let mut mc = m.callbacks(());
15 mc.on_message(|_,msg| {
16 if ! msg.retained() {
17 if bonzo.matches(&msg) {
18 println!("bonzo {:?}",msg);
19 } else
20 if frodo.matches(&msg) {
21 println!("frodo {:?}",msg);
22 m.disconnect().unwrap();
23 }
24 }
25 });
26
27 m.loop_forever(200).expect("broken loop");
28}Sourcepub fn loop_until_disconnect(&self, timeout: i32) -> Result<()>
pub fn loop_until_disconnect(&self, timeout: i32) -> Result<()>
loop forever, but do not regard an explicit disconnect as an error.
Examples found in repository?
6fn go() -> Result<(),Box<Error>> {
7 let m = Mosquitto::new("test");
8
9 m.connect("localhost",1883)?;
10
11 // publish and get a message id
12 let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
13
14 // and wait for confirmation for that message id
15 let mut mc = m.callbacks(());
16 mc.on_publish(|_,mid| {
17 if mid == our_mid {
18 m.disconnect().unwrap();
19 }
20 });
21
22 // wait forever until explicit disconnect
23 m.loop_until_disconnect(-1)?;
24 Ok(())
25}More examples
5fn main() {
6 let m = Mosquitto::new("test");
7
8 m.connect("localhost",1883).expect("can't connect");
9 m.subscribe("bilbo/#",1).expect("can't subscribe to bonzo");
10
11 let mt = m.clone();
12 thread::spawn(move || {
13 let timeout = time::Duration::from_millis(500);
14 for _ in 0..5 {
15 mt.publish("bilbo/baggins","hello dolly".as_bytes(), 1, false).unwrap();
16 thread::sleep(timeout);
17 }
18 mt.disconnect().unwrap();
19 });
20
21 let mut mc = m.callbacks(0);
22 mc.on_message(|data,msg| {
23 println!("bilbo {:?}",msg);
24 *data += 1;
25 });
26 mc.on_disconnect(|_,rc| println!("disconnect {}",rc));
27
28
29 m.loop_until_disconnect(200).expect("broken loop");
30 println!("received {} messages",mc.data);
31}Sourcepub fn tls_set<P1, P2, P3>(
&self,
cafile: P1,
certfile: P2,
keyfile: P3,
passphrase: Option<&str>,
) -> Result<()>
pub fn tls_set<P1, P2, P3>( &self, cafile: P1, certfile: P2, keyfile: P3, passphrase: Option<&str>, ) -> Result<()>
Set TLS parameters
cafile is a file containing the PEM encoded trusted CA certificate
certfile is a file containing the PEM encoded certificate file for this client.
keyfile is a file containing the PEM encoded private key for this client.
password if the private key is encrypted
Sourcepub fn tls_psk_set(
&self,
psk: &str,
identity: &str,
ciphers: Option<&str>,
) -> Result<()>
pub fn tls_psk_set( &self, psk: &str, identity: &str, ciphers: Option<&str>, ) -> Result<()>
Set TLS PSK parameters
psk is the pre-shared-key in hex format with no leading “0x”
identity is the identity of this client. May be used as the username
ciphers is an optional string describing the PSK ciphers available for use