mosquitto_client/
lib.rs

1//! Mosquitto is a popular MQTT broker implemented in C. Although there are pure
2//! Rust MQTT clients, it is still useful to have a binding to the Mosquitto client.
3//!
4//! The basic story is that you connect to a broker, _subscribing_ to topics that
5//! interest you and _publishing_ messages on a particular topic. The messages
6//! may be any arbitrary bytes, but this implementation does require that the topics
7//! themselves be UTF-8.  The C API is based on callbacks, which are mapped onto
8//! Rust closures. Everything starts with [Mosquitto](struct.Mosquitto.html).
9//!
10//! For example, publishing a message and confirming that it is sent:
11//!
12//! ```rust
13//! # fn run() -> std::result::Result<(),Box<std::error::Error>> {
14//! let m = mosquitto_client::Mosquitto::new("test");
15//!
16//! m.connect("localhost",1883)?;
17//!
18//! // publish and get a message id
19//! let our_mid = m.publish("bonzo/dog","hello dolly".as_bytes(), 2, false)?;
20//!
21//! // and wait for confirmation for that message id
22//! let mut mc = m.callbacks(());
23//! mc.on_publish(|_,mid| {
24//!     if mid == our_mid {
25//!         m.disconnect().unwrap();
26//!     }
27//! });
28//!
29//! // wait forever until explicit disconnect
30//! // -1 means use default timeout on operations
31//! m.loop_until_disconnect(-1)?;
32//! # Ok(())
33//! # }
34//! #
35//! # fn main() {
36//! #    run().unwrap();
37//! # }
38//! ```
39//!
40//! Here we subscribe and listen to several topics:
41//!
42//! ```rust,no_run
43//! # fn run() -> std::result::Result<(),Box<std::error::Error>> {
44//! let m = mosquitto_client::Mosquitto::new("test");
45//!
46//! m.connect("localhost",1883)?;
47//! let bonzo = m.subscribe("bonzo/#",0)?;
48//! let frodo = m.subscribe("frodo/#",0)?;
49//!
50//! let mut mc = m.callbacks(());
51//! mc.on_message(|_,msg| {
52//!     if ! msg.retained() { // not interested in any retained messages!
53//!         if bonzo.matches(&msg) {
54//!             println!("bonzo {:?}",msg);
55//!         } else
56//!         if frodo.matches(&msg) {
57//!             println!("frodo {:?}",msg);
58//!         }
59//!     }
60//! });
61//!
62//! m.loop_forever(200)?;
63//! # Ok(())
64//! # }
65//! #
66//! # fn main() {
67//! #    run().unwrap();
68//! # }
69//! ```
70//!
71//! You can always just do a regular match on the recevied topic name
72//! from the [MosqMessage](struct.MosqMessage.html) `topic` method.
73//!
74//! The `callbacks` method can be given a value, and the _first_ argument of any
75//! callback will be a mutable reference to that value (this avoids the usual
76//! shenanigans involved with closures having mutable borrows)
77//!
78//! ```rust
79//! # fn run() -> std::result::Result<(),Box<std::error::Error>> {
80//! use std::{thread,time};
81//!
82//! let m = mosquitto_client::Mosquitto::new("test");
83//!
84//! m.connect("localhost",1883)?;
85//! m.subscribe("bilbo/#",1)?;
86//!
87//! let mt = m.clone();
88//! thread::spawn(move || {
89//!     let timeout = time::Duration::from_millis(500);
90//!     for i in 0..5 {
91//!         let msg = format!("hello #{}",i+1);
92//!         mt.publish("bilbo/baggins",msg.as_bytes(), 1, false).unwrap();
93//!         thread::sleep(timeout);
94//!     }
95//!     mt.disconnect().unwrap();
96//! });
97//!
98//! let mut mc = m.callbacks(Vec::new());
99//! mc.on_message(|data,msg| {
100//!     data.push(msg.text().to_string());
101//! });
102//!
103//! m.loop_until_disconnect(200)?;
104//! assert_eq!(mc.data.len(),5);
105//! # Ok(())
106//! # }
107//! #
108//! # fn main() {
109//! #    run().unwrap();
110//! # }
111//! ```
112//!
113use std::os::raw::{c_int,c_char};
114use std::ffi::{CStr,CString};
115use std::error;
116use std::fmt;
117use std::path::Path;
118use std::time::{Duration,Instant};
119use std::fmt::{Display,Debug};
120use std::ptr::null;
121use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
122
123static INSTANCES: AtomicUsize = ATOMIC_USIZE_INIT;
124
125pub mod sys;
126
127use sys::*;
128
129/// Our Error type.
130/// Covers both regular Mosquitto errors and connection errors.
131#[derive(Debug)]
132pub struct Error {
133    text: String,
134    errcode: i32,
135    connect: bool,
136}
137
138impl Display for Error {
139    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
140        write!(f,"{}",self.text)
141    }
142}
143
144pub type Result<T> = ::std::result::Result<T,Error>;
145
146impl Error {
147    /// create a new Mosquitto error
148    pub fn new(msg: &str, rc: c_int) -> Error {
149        Error{text: format!("{}: {}",msg,mosq_strerror(rc)), errcode: rc, connect: false}
150    }
151
152    /// create a new connection error
153    pub fn new_connect(rc: c_int) -> Error {
154        Error{text: connect_error(rc).into(), errcode: rc, connect: true}
155    }
156
157    fn result(call: &str, rc: c_int) -> Result<()> {
158        if rc != 0 {
159            Err(Error::new(call,rc))
160        } else {
161            Ok(())
162        }
163    }
164
165    /// underlying error code
166    pub fn error(&self) -> i32 {
167        self.errcode
168    }
169}
170
171impl error::Error for Error {
172    fn description(&self) -> &str {
173        &self.text
174    }
175}
176
177fn cs(s: &str) -> CString {
178    CString::new(s).expect("Text contained nul bytes")
179}
180
181// note: this does not feel right - must be a way
182fn cpath(p: &Path) -> CString {
183    cs(p.to_str().expect("Non UTF-8 filename"))
184}
185
186/// A mosquitto message
187pub struct MosqMessage {
188    msg: *const Message,
189    owned: bool
190}
191
192use std::mem;
193
194#[link(name = "c")]
195extern {
196    fn malloc(size: usize) -> *mut u8;
197}
198
199impl MosqMessage {
200    fn new(msg: *const Message, clone: bool) -> MosqMessage {
201        if clone {
202            unsafe {
203                let m = malloc(mem::size_of::<Message>()) as *mut Message;
204                mosquitto_message_copy(m,msg);
205                MosqMessage{msg:m,owned:true}
206            }
207        } else {
208            MosqMessage{msg:msg, owned:false}
209        }
210    }
211
212    fn msg_ref(&self) -> &Message {
213        unsafe { &*self.msg }
214    }
215
216    /// the topic of the message.
217    /// This will **panic** if the topic isn't valid UTF-8
218    pub fn topic(&self) -> &str {
219        unsafe { CStr::from_ptr(self.msg_ref().topic).to_str().expect("Topic was not UTF-8")  }
220    }
221
222    /// the payload as bytes
223    pub fn payload(&self) -> &[u8] {
224        let msg = self.msg_ref();
225        unsafe {
226            ::std::slice::from_raw_parts(
227                msg.payload,
228                msg.payloadlen as usize
229            )
230        }
231    }
232
233    /// the payload as text.
234    /// This will **panic** if the payload was not valid UTF-8
235    pub fn text(&self) -> &str {
236        ::std::str::from_utf8(self.payload()).expect("Payload was not UTF-8")
237    }
238
239    /// the quality-of-service of the message.
240    /// The desired QoS is specified when we subscribe.
241    pub fn qos(&self) -> u32 {
242        self.msg_ref().qos as u32
243    }
244
245    /// was the message retained by the broker?
246    /// True if we received this as a retained message.
247    /// Subsequent messages marked as retained will not set this.
248    pub fn retained(&self) -> bool {
249        if self.msg_ref().retain > 0 {true} else {false}
250    }
251}
252
253impl Debug for MosqMessage {
254    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
255        let this = self.msg_ref();
256        write!(f,"{}: mid {} len {} qos {} retain {}",self.topic(),
257            this.mid, this.payloadlen, this.qos, this.retain)
258    }
259}
260
261impl Clone for MosqMessage {
262    fn clone(&self) -> Self {
263        MosqMessage::new(self.msg,true)
264    }
265}
266
267impl Drop for MosqMessage {
268    fn drop(&mut self) {
269        // eprintln!("dropping {}",self.owned);
270        if self.owned {
271            unsafe { mosquitto_message_free(&self.msg) };
272        }
273    }
274}
275
276/// Matching subscription topics.
277/// Returned from [Mosquitto::subscribe](struct.Mosquitto.html#method.subscribe).
278pub struct TopicMatcher<'a> {
279    sub: CString,
280    /// the subscription id.
281    pub mid: i32,
282    mosq: &'a Mosquitto,
283}
284
285impl <'a>TopicMatcher<'a> {
286    fn new(sub: CString, mid: i32, mosq: &'a Mosquitto) -> TopicMatcher<'a> {
287        TopicMatcher{sub: sub, mid: mid, mosq: mosq}
288    }
289
290    /// true if a message matches a subscription topic
291    pub fn matches(&self, msg: &MosqMessage) -> bool {
292        let mut matched: u8 = 0;
293        unsafe {
294             mosquitto_topic_matches_sub(self.sub.as_ptr(),msg.msg_ref().topic, &mut matched);
295        }
296        if matched > 0 {true} else {false}
297    }
298
299    fn receive(&self, millis: i32, just_one: bool) -> Result<Vec<MosqMessage>> {
300        let t = Instant::now();
301        let wait = Duration::from_millis(millis as u64);
302        let mut mc = self.mosq.callbacks(Vec::new());
303        mc.on_message(|data,msg| {
304            if self.matches(&msg) {
305                data.push(MosqMessage::new(msg.msg,true));
306            }
307        });
308
309        while t.elapsed() < wait {
310            self.mosq.do_loop(millis)?;
311            if just_one && mc.data.len() > 0 {
312                break;
313            }
314        }
315
316        if mc.data.len() > 0 { // we got mail!
317            // take results out of the sticky grip of mc data
318            let mut res = Vec::new();
319            ::std::mem::swap(&mut mc.data, &mut res);
320            Ok(res)
321        } else { // no messages considered an Error...
322            Err(Error::new("receive",MOSQ_ERR_TIMEOUT))
323        }
324
325    }
326
327    /// receive and return messages matching this topic, until timeout
328    pub fn receive_many(&self, millis: i32) -> Result<Vec<MosqMessage>> {
329        self.receive(millis,false)
330    }
331
332
333    /// receive and return exactly one message matching this topic
334    pub fn receive_one(&self, millis: i32) -> Result<MosqMessage> {
335        self.receive(millis,true).map(|mut v| v.remove(0))
336    }
337}
338
339/// Mosquitto version
340pub struct Version {
341    pub major: u32,
342    pub minor: u32,
343    pub revision: u32
344}
345
346impl Display for Version {
347    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
348        write!(f,"{}.{}.{}",self.minor,self.major,self.revision)
349    }
350}
351
352/// get version of the mosquitto client
353pub fn version() -> Version {
354    let mut major: c_int = 0;
355    let mut minor: c_int = 0;
356    let mut revision: c_int = 0;
357
358    unsafe { mosquitto_lib_version(&mut major,&mut minor,&mut revision); }
359
360    Version{major: major as u32,minor: minor as u32,revision: revision as u32}
361}
362
363/// Mosquitto client
364pub struct Mosquitto {
365    mosq: *const Mosq,
366    owned: bool,
367}
368
369impl Mosquitto {
370
371    /// create a new mosquitto instance, providing a client name.
372    /// Clients connecting to a broker must have unique names
373    pub fn new(id: &str) -> Mosquitto {
374        Mosquitto::new_session(id, true)
375    }
376
377    /// create a new mosquitto instance with specified clean session flag.
378    /// Clients connecting to a broker must have unique names
379    pub fn new_session(id: &str, clean_session: bool) -> Mosquitto {
380        if INSTANCES.fetch_add(1, Ordering::SeqCst) == 0 {
381            // println!("initializing mosq");
382            unsafe { mosquitto_lib_init(); }
383        }
384        let mosq = unsafe {
385            mosquitto_new(cs(id).as_ptr(),if clean_session {1} else {0},null())
386        };
387        Mosquitto{
388            mosq: mosq,
389            owned: true
390        }
391    }
392
393    /// create a Callback object so you can listen to events.
394    pub fn callbacks<'a,T>(&'a self, data: T) -> Callbacks<'a,T> {
395        Callbacks::new(self,data)
396    }
397
398    /// connect to the broker.
399    /// You can only be fully sure that a connection succeeds
400    /// after the [on_connect](struct.Callbacks#method.on_connect) callback returns non-zero
401    pub fn connect(&self, host: &str, port: u32) -> Result<()> {
402        Error::result("connect",unsafe {
403             mosquitto_connect(self.mosq,cs(host).as_ptr(),port as c_int,0)
404        })
405    }
406
407    /// connect to the broker, waiting for success.
408    pub fn connect_wait(&self, host: &str, port: u32, millis: i32) -> Result<()> {
409        self.connect(host,port)?;
410        let t = Instant::now();
411        let wait = Duration::from_millis(millis as u64);
412        let mut callback = self.callbacks(MOSQ_CONNECT_ERR_TIMEOUT);
413        callback.on_connect(|data, rc| {
414            *data = rc;
415        });
416        loop {
417            self.do_loop(millis)?;
418            if callback.data == MOSQ_CONNECT_ERR_OK {
419                return Ok(())
420            };
421            if t.elapsed() > wait {
422                break;
423            }
424        }
425        Err(Error::new_connect(callback.data))
426
427    }
428
429    /// call if you wish to use Mosquitto in a multithreaded environment.
430    pub fn threaded(&self) {
431        unsafe { mosquitto_threaded_set(self.mosq,1); }
432    }
433
434    /// reconnect to the broker
435    pub fn reconnect(&self) -> Result<()> {
436        Error::result("reconnect",unsafe {
437            mosquitto_reconnect(self.mosq)
438        })
439    }
440
441    pub fn reconnect_delay_set(&self,delay: u32, delay_max: u32, exponential_backoff: bool) -> Result<()> {
442        Error::result("delay_set",unsafe {
443            mosquitto_reconnect_delay_set(self.mosq,
444                delay as c_int,
445                delay_max as c_int,
446                exponential_backoff as u8
447        )})
448
449    }
450
451    /// subscribe to an MQTT topic, with a desired quality-of-service.
452    /// The returned value can be used to directly match
453    /// against received messages, and has a `mid` field identifying
454    /// the subscribing request. on_subscribe will be called with this
455    /// identifier.
456    pub fn subscribe<'a>(&'a self, sub: &str, qos: u32) -> Result<TopicMatcher<'a>> {
457        let mut mid: c_int = 0;
458        let sub = cs(sub);
459        let rc = unsafe { mosquitto_subscribe(self.mosq,&mut mid,sub.as_ptr(),qos as c_int) };
460        if rc == 0 {
461            Ok(TopicMatcher::new(sub,mid,self))
462        } else {
463            Err(Error::new("subscribe",rc))
464        }
465    }
466
467    /// unsubcribe from an MQTT topic - `on_unsubscribe` callback will be called.
468    pub fn unsubscribe(&self, sub: &str) -> Result<i32> {
469        let mut mid = 0;
470        let rc = unsafe { mosquitto_unsubscribe(self.mosq,&mut mid, cs(sub).as_ptr()) };
471        if rc == 0 {
472            Ok(mid as i32)
473        } else {
474            Err(Error::new("unsubscribe",rc))
475        }
476    }
477
478    /// publish an MQTT message to the broker, returning message id.
479    /// Quality-of-service and whether retained can be specified.
480    /// To be sure, check the message id passed to the `on_publish` callback
481    pub fn publish(&self, topic: &str, payload: &[u8], qos: u32, retain: bool) -> Result<i32> {
482        let mut mid = 0;
483
484        let rc = unsafe { mosquitto_publish(
485            self.mosq,&mut mid, cs(topic).as_ptr(),
486            payload.len() as c_int,payload.as_ptr(),
487            qos as c_int, if retain {1} else {0}
488        )};
489
490        if rc == 0 {
491            Ok(mid as i32)
492        } else {
493            Err(Error::new("publish",rc))
494        }
495    }
496
497    pub fn will_set(&self, topic: &str, payload: &[u8], qos: u32, retain: bool) -> Result<()> {
498        Error::result("will_set",unsafe { mosquitto_will_set(
499            self.mosq, cs(topic).as_ptr(),
500            payload.len() as c_int,payload.as_ptr(),
501            qos as c_int, if retain {1} else {0}
502        )})
503    }
504
505    pub fn will_clear(&self) -> Result<()> {
506        Error::result("will_clear",unsafe {
507            mosquitto_will_clear(self.mosq)
508        })
509    }
510
511    /// publish an MQTT message to the broker, returning message id after waiting for successful publish
512    pub fn publish_wait(&self, topic: &str, payload: &[u8], qos: u32, retain: bool, millis: i32) -> Result<i32> {
513        let our_mid = self.publish(topic,payload,qos,retain)?;
514        let t = Instant::now();
515        let wait = Duration::from_millis(millis as u64);
516        let mut callback = self.callbacks(0);
517        callback.on_publish(|data, mid| {
518            *data = mid;
519        });
520        loop {
521            self.do_loop(millis)?;
522            if callback.data == our_mid {
523                return Ok(our_mid)
524            };
525            if t.elapsed() > wait {
526                break;
527            }
528        }
529        Err(Error::new("publish",MOSQ_ERR_UNKNOWN))
530    }
531
532
533    /// explicitly disconnect from the broker.
534    pub fn disconnect(&self) -> Result<()> {
535        Error::result("disconnect",unsafe {
536            mosquitto_disconnect(self.mosq)
537        })
538    }
539
540    /// process network events for at most `timeout` milliseconds.
541    /// -1 will mean the default, 1000ms.
542    pub fn do_loop(&self, timeout: i32) -> Result<()> {
543        Error::result("do_loop",unsafe {
544            mosquitto_loop(self.mosq,timeout as c_int,1)
545        })
546    }
547
548    /// process network events.
549    /// This will handle intermittent disconnects for you,
550    /// but will return after an explicit [disconnect()](#method.disconnect) call
551    pub fn loop_forever(&self, timeout: i32) -> Result<()> {
552        Error::result("loop_forever",unsafe {
553            mosquitto_loop_forever(self.mosq,timeout as c_int,1)
554        })
555    }
556
557    /// loop forever, but do not regard an explicit disconnect as an error.
558    pub fn loop_until_disconnect(&self, timeout: i32) -> Result<()> {
559       if let Err(e) = self.loop_forever(timeout) {
560            if e.error() == sys::MOSQ_ERR_NO_CONN {
561                Ok(())
562            } else { // errror handling......!
563                Err(e)
564            }
565        } else {
566            Ok(())
567        }
568    }
569
570    /// Set TLS parameters
571    /// `cafile` is a file containing the PEM encoded trusted CA certificate
572    /// `certfile` is a file containing the PEM encoded certificate file for this client.
573    /// `keyfile` is a file containing the PEM encoded private key for this client.
574    /// `password` if the private key is encrypted
575    pub fn tls_set<P1,P2,P3>(&self, cafile: P1, certfile: P2, keyfile: P3, passphrase: Option<&str>) -> Result<()>
576    where P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path> {
577        Error::result("tls_set",unsafe {
578            // Yes, this is awful
579            let callback = if let Some(passphrase) = passphrase {
580                PASSWORD_PTR = cs(passphrase).into_raw();
581                PASSWORD_SIZE = passphrase.len();
582                true
583            } else {
584                false
585            };
586            mosquitto_tls_set(self.mosq,
587                cpath(cafile.as_ref()).as_ptr(),null() as *const c_char,
588                cpath(certfile.as_ref()).as_ptr(),cpath(keyfile.as_ref()).as_ptr(),
589                if callback {Some(mosq_password_callback)} else {None}
590            )
591        })
592
593    }
594
595    /// Set TLS PSK parameters
596    /// `psk` is the pre-shared-key in hex format with no leading "0x"
597    /// `identity` is the identity of this client. May be used as the username
598    /// `ciphers` is an optional string describing the PSK ciphers available for use
599    pub fn tls_psk_set(&self, psk: &str, identity: &str, ciphers: Option<&str>) -> Result<()> {
600        Error::result("tls_psk_set",unsafe {
601            let cipher;
602            let cipher_ptr = if let Some(ciphers) = ciphers {
603                cipher = cs(ciphers);
604                cipher.as_ptr()
605            } else {
606                null() as *const c_char
607            };
608            mosquitto_tls_psk_set(self.mosq,cs(psk).as_ptr(),cs(identity).as_ptr(),cipher_ptr)
609        })
610    }
611
612
613}
614
615static mut PASSWORD_PTR: *const c_char = 0 as *const c_char;
616static mut PASSWORD_SIZE: usize = 0;
617
618use std::ptr;
619
620extern fn mosq_password_callback(buf: *mut c_char, _size: c_int, _rwflag: c_int, _userdata: *mut Data)->c_int {
621    unsafe {
622        ptr::copy(PASSWORD_PTR, buf, PASSWORD_SIZE+1);
623        PASSWORD_SIZE as c_int
624    }
625}
626
627// mosquitto is thread-safe, so let's tell Rust about it
628unsafe impl Send for Mosquitto {}
629unsafe impl Sync for Mosquitto {}
630
631// important that clones do not own the underlying pointer
632// and try to free it!
633impl Clone for Mosquitto {
634    fn clone(&self) -> Mosquitto {
635        Mosquitto{
636            mosq: self.mosq,
637            owned: false
638        }
639    }
640}
641
642impl Drop for Mosquitto {
643    fn drop(&mut self) {
644        // eprintln!("Mosquitto drop {}",self.owned);
645        if self.owned {
646            unsafe { mosquitto_destroy(self.mosq); }
647            // the last person to leave the building must turn off the lights
648            if INSTANCES.fetch_sub(1, Ordering::SeqCst) == 1 {
649                // eprintln!("clean up mosq");
650                unsafe {mosquitto_lib_init();}
651            }
652        }
653    }
654}
655
656/// Handling mosquitto callbacks.
657/// This will pass a mutable reference to the
658/// contained data to the callbacks.
659pub struct Callbacks<'a,T> {
660    message_callback: Option<Box<Fn(&mut T,MosqMessage) + 'a>>,
661    connect_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
662    publish_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
663    subscribe_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
664    unsubscribe_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
665    disconnect_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
666    log_callback: Option<Box<Fn(&mut T,u32,&str) + 'a>>,
667    mosq: &'a Mosquitto,
668    init: bool,
669    pub data: T,
670}
671
672impl <'a,T> Callbacks<'a,T> {
673
674    /// create a new callback handler with data.
675    /// Initialize with an existing Mosquitto reference.
676    pub fn new(mosq: &Mosquitto, data: T) -> Callbacks<T> {
677        Callbacks {
678            message_callback: None,
679            connect_callback: None,
680            publish_callback: None,
681            subscribe_callback: None,
682            unsubscribe_callback: None,
683            disconnect_callback: None,
684            log_callback: None,
685            mosq: mosq,
686            init: false,
687            data: data
688        }
689    }
690
691    /// a reference to the Mosquitto instance
692    pub fn mosq(&self) -> &Mosquitto {
693        self.mosq
694    }
695
696    fn initialize(&mut self) {
697        if ! self.init {
698            self.init = true;
699            let pdata: *const Callbacks<T> = &*self;
700            unsafe {
701                mosquitto_user_data_set(self.mosq.mosq, pdata as *const Data);
702            };
703        }
704    }
705
706    /// provide a closure which will be called when messages arrive.
707    /// You are passed a mutable reference to data and the message
708    pub fn on_message<C: Fn(&mut T,MosqMessage) + 'a>(&mut self, callback: C) {
709        self.initialize();
710        unsafe {mosquitto_message_callback_set(self.mosq.mosq,mosq_message_callback::<T>);}
711        self.message_callback = Some(Box::new(callback));
712    }
713
714    /// provide a closure which is called when connection happens.
715    /// You are passed a mutable reference to data and the status.
716    pub fn on_connect<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
717        self.initialize();
718        unsafe {mosquitto_connect_callback_set(self.mosq.mosq,mosq_connect_callback::<T>);}
719        self.connect_callback = Some(Box::new(callback));
720    }
721
722    /// provide a closure which is called after publishing a message.
723    /// You are passed a mutable reference to data and the message id.
724    pub fn on_publish<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
725        self.initialize();
726        unsafe {mosquitto_publish_callback_set(self.mosq.mosq,mosq_publish_callback::<T>);}
727        self.publish_callback = Some(Box::new(callback));
728    }
729
730    /// provide a closure which is called after subscribing.
731    /// You are passed a mutable reference to data and the subscription id.
732    pub fn on_subscribe<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
733        self.initialize();
734        unsafe {mosquitto_subscribe_callback_set(self.mosq.mosq,mosq_subscribe_callback::<T>);}
735        self.subscribe_callback = Some(Box::new(callback));
736    }
737
738    /// provide a closure which is called after unsubscribing from a topic
739    /// You are passed a mutable reference to data and the subscription id.
740    pub fn on_unsubscribe<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
741        self.initialize();
742        unsafe {mosquitto_unsubscribe_callback_set(self.mosq.mosq,mosq_unsubscribe_callback::<T>);}
743        self.unsubscribe_callback = Some(Box::new(callback));
744    }
745
746    /// provide a closure which is called when client disconnects from broker.
747    /// You are passed a mutable reference to data and ....
748    pub fn on_disconnect<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
749        self.initialize();
750        unsafe {mosquitto_disconnect_callback_set(self.mosq.mosq,mosq_disconnect_callback::<T>);}
751        self.disconnect_callback = Some(Box::new(callback));
752    }
753
754    /// provide a closure which is called for each log message
755    /// You are passed a mutable reference to data, a logging level,
756    /// and the text of the log message
757    pub fn on_log<C: Fn(&mut T,u32,&str) + 'a>(&mut self, callback: C) {
758        self.initialize();
759        unsafe {mosquitto_log_callback_set(self.mosq.mosq,mosq_log_callback::<T>);}
760        self.log_callback = Some(Box::new(callback));
761    }
762
763}
764
765impl <'a,T>Drop for Callbacks<'a,T> {
766    fn drop(&mut self) {
767        unsafe {
768            mosquitto_user_data_set(self.mosq.mosq, null() as *const Data);
769        }
770    }
771}
772
773
774// clean up with a macro (suprisingly hard to write as a function)
775macro_rules! callback_ref {
776    ($data:expr,$T:ident) =>
777    {
778        unsafe {&mut *($data as *mut Callbacks<$T>)}
779    }
780}
781
782extern fn mosq_connect_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
783    if data.is_null() { return; }
784    let this = callback_ref!(data,T);
785    if let Some(ref callback) = this.connect_callback {
786        callback(&mut this.data, rc as i32);
787    }
788}
789
790extern fn mosq_publish_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
791    if data.is_null() { return; }
792    let this = callback_ref!(data,T);
793    if let Some(ref callback) = this.publish_callback {
794        callback(&mut this.data, rc as i32);
795    }
796}
797
798extern fn mosq_message_callback<T>(_: *const Mosq, data: *mut Data, message: *const Message) {
799    if data.is_null() { return; }
800    let this = callback_ref!(data,T);
801    //println!("msg {:?}", unsafe {&*message});
802    if let Some(ref callback) = this.message_callback {
803        callback(&mut this.data, MosqMessage::new(message,false));
804    }
805}
806
807extern fn mosq_subscribe_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
808    if data.is_null() { return; }
809    let this = callback_ref!(data,T);
810    if let Some(ref callback) = this.subscribe_callback {
811        callback(&mut this.data, rc as i32);
812    }
813}
814
815extern fn mosq_unsubscribe_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
816    if data.is_null() { return; }
817    let this = callback_ref!(data,T);
818    if let Some(ref callback) = this.unsubscribe_callback {
819        callback(&mut this.data, rc as i32);
820    }
821}
822
823extern fn mosq_disconnect_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
824    if data.is_null() { return; }
825    let this = callback_ref!(data,T);
826    if let Some(ref callback) = this.disconnect_callback {
827        callback(&mut this.data, rc as i32);
828    }
829}
830
831extern fn mosq_log_callback<T>(_: *const Mosq, data: *mut Data, level: c_int, text: *const c_char) {
832    if data.is_null() { return; }
833    let this = callback_ref!(data,T);
834    let text = unsafe { CStr::from_ptr(text).to_str().expect("log text was not UTF-8")  };
835    if let Some(ref callback) = this.log_callback {
836        callback(&mut this.data, level as u32, text);
837    }
838}
839