1use std::os::raw::{c_int,c_char};
114use std::ffi::{CStr,CString};
115use std::error;
116use std::fmt;
117use std::path::Path;
118use std::time::{Duration,Instant};
119use std::fmt::{Display,Debug};
120use std::ptr::null;
121use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
122
123static INSTANCES: AtomicUsize = ATOMIC_USIZE_INIT;
124
125pub mod sys;
126
127use sys::*;
128
129#[derive(Debug)]
132pub struct Error {
133 text: String,
134 errcode: i32,
135 connect: bool,
136}
137
138impl Display for Error {
139 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
140 write!(f,"{}",self.text)
141 }
142}
143
144pub type Result<T> = ::std::result::Result<T,Error>;
145
146impl Error {
147 pub fn new(msg: &str, rc: c_int) -> Error {
149 Error{text: format!("{}: {}",msg,mosq_strerror(rc)), errcode: rc, connect: false}
150 }
151
152 pub fn new_connect(rc: c_int) -> Error {
154 Error{text: connect_error(rc).into(), errcode: rc, connect: true}
155 }
156
157 fn result(call: &str, rc: c_int) -> Result<()> {
158 if rc != 0 {
159 Err(Error::new(call,rc))
160 } else {
161 Ok(())
162 }
163 }
164
165 pub fn error(&self) -> i32 {
167 self.errcode
168 }
169}
170
171impl error::Error for Error {
172 fn description(&self) -> &str {
173 &self.text
174 }
175}
176
177fn cs(s: &str) -> CString {
178 CString::new(s).expect("Text contained nul bytes")
179}
180
181fn cpath(p: &Path) -> CString {
183 cs(p.to_str().expect("Non UTF-8 filename"))
184}
185
186pub struct MosqMessage {
188 msg: *const Message,
189 owned: bool
190}
191
192use std::mem;
193
194#[link(name = "c")]
195extern {
196 fn malloc(size: usize) -> *mut u8;
197}
198
199impl MosqMessage {
200 fn new(msg: *const Message, clone: bool) -> MosqMessage {
201 if clone {
202 unsafe {
203 let m = malloc(mem::size_of::<Message>()) as *mut Message;
204 mosquitto_message_copy(m,msg);
205 MosqMessage{msg:m,owned:true}
206 }
207 } else {
208 MosqMessage{msg:msg, owned:false}
209 }
210 }
211
212 fn msg_ref(&self) -> &Message {
213 unsafe { &*self.msg }
214 }
215
216 pub fn topic(&self) -> &str {
219 unsafe { CStr::from_ptr(self.msg_ref().topic).to_str().expect("Topic was not UTF-8") }
220 }
221
222 pub fn payload(&self) -> &[u8] {
224 let msg = self.msg_ref();
225 unsafe {
226 ::std::slice::from_raw_parts(
227 msg.payload,
228 msg.payloadlen as usize
229 )
230 }
231 }
232
233 pub fn text(&self) -> &str {
236 ::std::str::from_utf8(self.payload()).expect("Payload was not UTF-8")
237 }
238
239 pub fn qos(&self) -> u32 {
242 self.msg_ref().qos as u32
243 }
244
245 pub fn retained(&self) -> bool {
249 if self.msg_ref().retain > 0 {true} else {false}
250 }
251}
252
253impl Debug for MosqMessage {
254 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
255 let this = self.msg_ref();
256 write!(f,"{}: mid {} len {} qos {} retain {}",self.topic(),
257 this.mid, this.payloadlen, this.qos, this.retain)
258 }
259}
260
261impl Clone for MosqMessage {
262 fn clone(&self) -> Self {
263 MosqMessage::new(self.msg,true)
264 }
265}
266
267impl Drop for MosqMessage {
268 fn drop(&mut self) {
269 if self.owned {
271 unsafe { mosquitto_message_free(&self.msg) };
272 }
273 }
274}
275
276pub struct TopicMatcher<'a> {
279 sub: CString,
280 pub mid: i32,
282 mosq: &'a Mosquitto,
283}
284
285impl <'a>TopicMatcher<'a> {
286 fn new(sub: CString, mid: i32, mosq: &'a Mosquitto) -> TopicMatcher<'a> {
287 TopicMatcher{sub: sub, mid: mid, mosq: mosq}
288 }
289
290 pub fn matches(&self, msg: &MosqMessage) -> bool {
292 let mut matched: u8 = 0;
293 unsafe {
294 mosquitto_topic_matches_sub(self.sub.as_ptr(),msg.msg_ref().topic, &mut matched);
295 }
296 if matched > 0 {true} else {false}
297 }
298
299 fn receive(&self, millis: i32, just_one: bool) -> Result<Vec<MosqMessage>> {
300 let t = Instant::now();
301 let wait = Duration::from_millis(millis as u64);
302 let mut mc = self.mosq.callbacks(Vec::new());
303 mc.on_message(|data,msg| {
304 if self.matches(&msg) {
305 data.push(MosqMessage::new(msg.msg,true));
306 }
307 });
308
309 while t.elapsed() < wait {
310 self.mosq.do_loop(millis)?;
311 if just_one && mc.data.len() > 0 {
312 break;
313 }
314 }
315
316 if mc.data.len() > 0 { let mut res = Vec::new();
319 ::std::mem::swap(&mut mc.data, &mut res);
320 Ok(res)
321 } else { Err(Error::new("receive",MOSQ_ERR_TIMEOUT))
323 }
324
325 }
326
327 pub fn receive_many(&self, millis: i32) -> Result<Vec<MosqMessage>> {
329 self.receive(millis,false)
330 }
331
332
333 pub fn receive_one(&self, millis: i32) -> Result<MosqMessage> {
335 self.receive(millis,true).map(|mut v| v.remove(0))
336 }
337}
338
339pub struct Version {
341 pub major: u32,
342 pub minor: u32,
343 pub revision: u32
344}
345
346impl Display for Version {
347 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
348 write!(f,"{}.{}.{}",self.minor,self.major,self.revision)
349 }
350}
351
352pub fn version() -> Version {
354 let mut major: c_int = 0;
355 let mut minor: c_int = 0;
356 let mut revision: c_int = 0;
357
358 unsafe { mosquitto_lib_version(&mut major,&mut minor,&mut revision); }
359
360 Version{major: major as u32,minor: minor as u32,revision: revision as u32}
361}
362
363pub struct Mosquitto {
365 mosq: *const Mosq,
366 owned: bool,
367}
368
369impl Mosquitto {
370
371 pub fn new(id: &str) -> Mosquitto {
374 Mosquitto::new_session(id, true)
375 }
376
377 pub fn new_session(id: &str, clean_session: bool) -> Mosquitto {
380 if INSTANCES.fetch_add(1, Ordering::SeqCst) == 0 {
381 unsafe { mosquitto_lib_init(); }
383 }
384 let mosq = unsafe {
385 mosquitto_new(cs(id).as_ptr(),if clean_session {1} else {0},null())
386 };
387 Mosquitto{
388 mosq: mosq,
389 owned: true
390 }
391 }
392
393 pub fn callbacks<'a,T>(&'a self, data: T) -> Callbacks<'a,T> {
395 Callbacks::new(self,data)
396 }
397
398 pub fn connect(&self, host: &str, port: u32) -> Result<()> {
402 Error::result("connect",unsafe {
403 mosquitto_connect(self.mosq,cs(host).as_ptr(),port as c_int,0)
404 })
405 }
406
407 pub fn connect_wait(&self, host: &str, port: u32, millis: i32) -> Result<()> {
409 self.connect(host,port)?;
410 let t = Instant::now();
411 let wait = Duration::from_millis(millis as u64);
412 let mut callback = self.callbacks(MOSQ_CONNECT_ERR_TIMEOUT);
413 callback.on_connect(|data, rc| {
414 *data = rc;
415 });
416 loop {
417 self.do_loop(millis)?;
418 if callback.data == MOSQ_CONNECT_ERR_OK {
419 return Ok(())
420 };
421 if t.elapsed() > wait {
422 break;
423 }
424 }
425 Err(Error::new_connect(callback.data))
426
427 }
428
429 pub fn threaded(&self) {
431 unsafe { mosquitto_threaded_set(self.mosq,1); }
432 }
433
434 pub fn reconnect(&self) -> Result<()> {
436 Error::result("reconnect",unsafe {
437 mosquitto_reconnect(self.mosq)
438 })
439 }
440
441 pub fn reconnect_delay_set(&self,delay: u32, delay_max: u32, exponential_backoff: bool) -> Result<()> {
442 Error::result("delay_set",unsafe {
443 mosquitto_reconnect_delay_set(self.mosq,
444 delay as c_int,
445 delay_max as c_int,
446 exponential_backoff as u8
447 )})
448
449 }
450
451 pub fn subscribe<'a>(&'a self, sub: &str, qos: u32) -> Result<TopicMatcher<'a>> {
457 let mut mid: c_int = 0;
458 let sub = cs(sub);
459 let rc = unsafe { mosquitto_subscribe(self.mosq,&mut mid,sub.as_ptr(),qos as c_int) };
460 if rc == 0 {
461 Ok(TopicMatcher::new(sub,mid,self))
462 } else {
463 Err(Error::new("subscribe",rc))
464 }
465 }
466
467 pub fn unsubscribe(&self, sub: &str) -> Result<i32> {
469 let mut mid = 0;
470 let rc = unsafe { mosquitto_unsubscribe(self.mosq,&mut mid, cs(sub).as_ptr()) };
471 if rc == 0 {
472 Ok(mid as i32)
473 } else {
474 Err(Error::new("unsubscribe",rc))
475 }
476 }
477
478 pub fn publish(&self, topic: &str, payload: &[u8], qos: u32, retain: bool) -> Result<i32> {
482 let mut mid = 0;
483
484 let rc = unsafe { mosquitto_publish(
485 self.mosq,&mut mid, cs(topic).as_ptr(),
486 payload.len() as c_int,payload.as_ptr(),
487 qos as c_int, if retain {1} else {0}
488 )};
489
490 if rc == 0 {
491 Ok(mid as i32)
492 } else {
493 Err(Error::new("publish",rc))
494 }
495 }
496
497 pub fn will_set(&self, topic: &str, payload: &[u8], qos: u32, retain: bool) -> Result<()> {
498 Error::result("will_set",unsafe { mosquitto_will_set(
499 self.mosq, cs(topic).as_ptr(),
500 payload.len() as c_int,payload.as_ptr(),
501 qos as c_int, if retain {1} else {0}
502 )})
503 }
504
505 pub fn will_clear(&self) -> Result<()> {
506 Error::result("will_clear",unsafe {
507 mosquitto_will_clear(self.mosq)
508 })
509 }
510
511 pub fn publish_wait(&self, topic: &str, payload: &[u8], qos: u32, retain: bool, millis: i32) -> Result<i32> {
513 let our_mid = self.publish(topic,payload,qos,retain)?;
514 let t = Instant::now();
515 let wait = Duration::from_millis(millis as u64);
516 let mut callback = self.callbacks(0);
517 callback.on_publish(|data, mid| {
518 *data = mid;
519 });
520 loop {
521 self.do_loop(millis)?;
522 if callback.data == our_mid {
523 return Ok(our_mid)
524 };
525 if t.elapsed() > wait {
526 break;
527 }
528 }
529 Err(Error::new("publish",MOSQ_ERR_UNKNOWN))
530 }
531
532
533 pub fn disconnect(&self) -> Result<()> {
535 Error::result("disconnect",unsafe {
536 mosquitto_disconnect(self.mosq)
537 })
538 }
539
540 pub fn do_loop(&self, timeout: i32) -> Result<()> {
543 Error::result("do_loop",unsafe {
544 mosquitto_loop(self.mosq,timeout as c_int,1)
545 })
546 }
547
548 pub fn loop_forever(&self, timeout: i32) -> Result<()> {
552 Error::result("loop_forever",unsafe {
553 mosquitto_loop_forever(self.mosq,timeout as c_int,1)
554 })
555 }
556
557 pub fn loop_until_disconnect(&self, timeout: i32) -> Result<()> {
559 if let Err(e) = self.loop_forever(timeout) {
560 if e.error() == sys::MOSQ_ERR_NO_CONN {
561 Ok(())
562 } else { Err(e)
564 }
565 } else {
566 Ok(())
567 }
568 }
569
570 pub fn tls_set<P1,P2,P3>(&self, cafile: P1, certfile: P2, keyfile: P3, passphrase: Option<&str>) -> Result<()>
576 where P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path> {
577 Error::result("tls_set",unsafe {
578 let callback = if let Some(passphrase) = passphrase {
580 PASSWORD_PTR = cs(passphrase).into_raw();
581 PASSWORD_SIZE = passphrase.len();
582 true
583 } else {
584 false
585 };
586 mosquitto_tls_set(self.mosq,
587 cpath(cafile.as_ref()).as_ptr(),null() as *const c_char,
588 cpath(certfile.as_ref()).as_ptr(),cpath(keyfile.as_ref()).as_ptr(),
589 if callback {Some(mosq_password_callback)} else {None}
590 )
591 })
592
593 }
594
595 pub fn tls_psk_set(&self, psk: &str, identity: &str, ciphers: Option<&str>) -> Result<()> {
600 Error::result("tls_psk_set",unsafe {
601 let cipher;
602 let cipher_ptr = if let Some(ciphers) = ciphers {
603 cipher = cs(ciphers);
604 cipher.as_ptr()
605 } else {
606 null() as *const c_char
607 };
608 mosquitto_tls_psk_set(self.mosq,cs(psk).as_ptr(),cs(identity).as_ptr(),cipher_ptr)
609 })
610 }
611
612
613}
614
615static mut PASSWORD_PTR: *const c_char = 0 as *const c_char;
616static mut PASSWORD_SIZE: usize = 0;
617
618use std::ptr;
619
620extern fn mosq_password_callback(buf: *mut c_char, _size: c_int, _rwflag: c_int, _userdata: *mut Data)->c_int {
621 unsafe {
622 ptr::copy(PASSWORD_PTR, buf, PASSWORD_SIZE+1);
623 PASSWORD_SIZE as c_int
624 }
625}
626
627unsafe impl Send for Mosquitto {}
629unsafe impl Sync for Mosquitto {}
630
631impl Clone for Mosquitto {
634 fn clone(&self) -> Mosquitto {
635 Mosquitto{
636 mosq: self.mosq,
637 owned: false
638 }
639 }
640}
641
642impl Drop for Mosquitto {
643 fn drop(&mut self) {
644 if self.owned {
646 unsafe { mosquitto_destroy(self.mosq); }
647 if INSTANCES.fetch_sub(1, Ordering::SeqCst) == 1 {
649 unsafe {mosquitto_lib_init();}
651 }
652 }
653 }
654}
655
656pub struct Callbacks<'a,T> {
660 message_callback: Option<Box<Fn(&mut T,MosqMessage) + 'a>>,
661 connect_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
662 publish_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
663 subscribe_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
664 unsubscribe_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
665 disconnect_callback: Option<Box<Fn(&mut T,i32) + 'a>>,
666 log_callback: Option<Box<Fn(&mut T,u32,&str) + 'a>>,
667 mosq: &'a Mosquitto,
668 init: bool,
669 pub data: T,
670}
671
672impl <'a,T> Callbacks<'a,T> {
673
674 pub fn new(mosq: &Mosquitto, data: T) -> Callbacks<T> {
677 Callbacks {
678 message_callback: None,
679 connect_callback: None,
680 publish_callback: None,
681 subscribe_callback: None,
682 unsubscribe_callback: None,
683 disconnect_callback: None,
684 log_callback: None,
685 mosq: mosq,
686 init: false,
687 data: data
688 }
689 }
690
691 pub fn mosq(&self) -> &Mosquitto {
693 self.mosq
694 }
695
696 fn initialize(&mut self) {
697 if ! self.init {
698 self.init = true;
699 let pdata: *const Callbacks<T> = &*self;
700 unsafe {
701 mosquitto_user_data_set(self.mosq.mosq, pdata as *const Data);
702 };
703 }
704 }
705
706 pub fn on_message<C: Fn(&mut T,MosqMessage) + 'a>(&mut self, callback: C) {
709 self.initialize();
710 unsafe {mosquitto_message_callback_set(self.mosq.mosq,mosq_message_callback::<T>);}
711 self.message_callback = Some(Box::new(callback));
712 }
713
714 pub fn on_connect<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
717 self.initialize();
718 unsafe {mosquitto_connect_callback_set(self.mosq.mosq,mosq_connect_callback::<T>);}
719 self.connect_callback = Some(Box::new(callback));
720 }
721
722 pub fn on_publish<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
725 self.initialize();
726 unsafe {mosquitto_publish_callback_set(self.mosq.mosq,mosq_publish_callback::<T>);}
727 self.publish_callback = Some(Box::new(callback));
728 }
729
730 pub fn on_subscribe<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
733 self.initialize();
734 unsafe {mosquitto_subscribe_callback_set(self.mosq.mosq,mosq_subscribe_callback::<T>);}
735 self.subscribe_callback = Some(Box::new(callback));
736 }
737
738 pub fn on_unsubscribe<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
741 self.initialize();
742 unsafe {mosquitto_unsubscribe_callback_set(self.mosq.mosq,mosq_unsubscribe_callback::<T>);}
743 self.unsubscribe_callback = Some(Box::new(callback));
744 }
745
746 pub fn on_disconnect<C: Fn(&mut T,i32) + 'a>(&mut self, callback: C) {
749 self.initialize();
750 unsafe {mosquitto_disconnect_callback_set(self.mosq.mosq,mosq_disconnect_callback::<T>);}
751 self.disconnect_callback = Some(Box::new(callback));
752 }
753
754 pub fn on_log<C: Fn(&mut T,u32,&str) + 'a>(&mut self, callback: C) {
758 self.initialize();
759 unsafe {mosquitto_log_callback_set(self.mosq.mosq,mosq_log_callback::<T>);}
760 self.log_callback = Some(Box::new(callback));
761 }
762
763}
764
765impl <'a,T>Drop for Callbacks<'a,T> {
766 fn drop(&mut self) {
767 unsafe {
768 mosquitto_user_data_set(self.mosq.mosq, null() as *const Data);
769 }
770 }
771}
772
773
774macro_rules! callback_ref {
776 ($data:expr,$T:ident) =>
777 {
778 unsafe {&mut *($data as *mut Callbacks<$T>)}
779 }
780}
781
782extern fn mosq_connect_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
783 if data.is_null() { return; }
784 let this = callback_ref!(data,T);
785 if let Some(ref callback) = this.connect_callback {
786 callback(&mut this.data, rc as i32);
787 }
788}
789
790extern fn mosq_publish_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
791 if data.is_null() { return; }
792 let this = callback_ref!(data,T);
793 if let Some(ref callback) = this.publish_callback {
794 callback(&mut this.data, rc as i32);
795 }
796}
797
798extern fn mosq_message_callback<T>(_: *const Mosq, data: *mut Data, message: *const Message) {
799 if data.is_null() { return; }
800 let this = callback_ref!(data,T);
801 if let Some(ref callback) = this.message_callback {
803 callback(&mut this.data, MosqMessage::new(message,false));
804 }
805}
806
807extern fn mosq_subscribe_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
808 if data.is_null() { return; }
809 let this = callback_ref!(data,T);
810 if let Some(ref callback) = this.subscribe_callback {
811 callback(&mut this.data, rc as i32);
812 }
813}
814
815extern fn mosq_unsubscribe_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
816 if data.is_null() { return; }
817 let this = callback_ref!(data,T);
818 if let Some(ref callback) = this.unsubscribe_callback {
819 callback(&mut this.data, rc as i32);
820 }
821}
822
823extern fn mosq_disconnect_callback<T>(_: *const Mosq, data: *mut Data, rc: c_int) {
824 if data.is_null() { return; }
825 let this = callback_ref!(data,T);
826 if let Some(ref callback) = this.disconnect_callback {
827 callback(&mut this.data, rc as i32);
828 }
829}
830
831extern fn mosq_log_callback<T>(_: *const Mosq, data: *mut Data, level: c_int, text: *const c_char) {
832 if data.is_null() { return; }
833 let this = callback_ref!(data,T);
834 let text = unsafe { CStr::from_ptr(text).to_str().expect("log text was not UTF-8") };
835 if let Some(ref callback) = this.log_callback {
836 callback(&mut this.data, level as u32, text);
837 }
838}
839