use std::os::raw::{c_int, c_char, c_uint, c_void};
use std::ffi::{CStr, CString};
use std::error;
use std::fmt;
use std::path::Path;
use std::time::{Duration, Instant};
use std::fmt::{Display, Debug};
use std::ptr::{null, null_mut};
use std::sync::atomic::{AtomicUsize, Ordering};
static INSTANCES: AtomicUsize = AtomicUsize::new(0);
pub mod sys;
use sys::*;
#[derive(Debug)]
pub struct Error {
text: String,
errcode: i32,
connect: bool,
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.text)
}
}
pub type Result<T> = ::std::result::Result<T, Error>;
impl Error {
pub fn new(msg: &str, rc: c_int) -> Error {
Error { text: format!("{}: {}", msg, mosq_strerror(rc)), errcode: rc, connect: false }
}
pub fn new_connect(rc: c_int) -> Error {
Error { text: connect_error(rc).into(), errcode: rc, connect: true }
}
fn result(call: &str, rc: c_int) -> Result<()> {
if rc != 0 {
Err(Error::new(call, rc))
} else {
Ok(())
}
}
pub fn error(&self) -> i32 {
self.errcode
}
}
impl error::Error for Error {
fn description(&self) -> &str {
&self.text
}
}
fn cs(s: &str) -> CString {
CString::new(s).expect("Text contained nul bytes")
}
fn cpath(p: &Path) -> CString {
cs(p.to_str().expect("Non UTF-8 filename"))
}
pub struct MosqMessage {
msg: *const Message,
owned: bool,
}
use std::mem;
#[link(name = "c")]
extern {
fn malloc(size: usize) -> *mut u8;
}
impl MosqMessage {
fn new(msg: *const Message, clone: bool) -> MosqMessage {
if clone {
unsafe {
let m = malloc(mem::size_of::<Message>()) as *mut Message;
mosquitto_message_copy(m, msg);
MosqMessage { msg: m, owned: true }
}
} else {
MosqMessage { msg: msg, owned: false }
}
}
fn msg_ref(&self) -> &Message {
unsafe { &*self.msg }
}
pub fn topic(&self) -> &str {
unsafe { CStr::from_ptr(self.msg_ref().topic).to_str().expect("Topic was not UTF-8") }
}
pub fn payload(&self) -> &[u8] {
let msg = self.msg_ref();
unsafe {
::std::slice::from_raw_parts(
&mut *(msg.payload as *mut u8),
msg.payloadlen as usize,
)
}
}
pub fn text(&self) -> &str {
::std::str::from_utf8(self.payload()).expect("Payload was not UTF-8")
}
pub fn qos(&self) -> u32 {
self.msg_ref().qos as u32
}
pub fn retained(&self) -> bool {
self.msg_ref().retain
}
}
impl Debug for MosqMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let this = self.msg_ref();
write!(f, "{}: mid {} len {} qos {} retain {}", self.topic(),
this.mid, this.payloadlen, this.qos, this.retain)
}
}
impl Clone for MosqMessage {
fn clone(&self) -> Self {
MosqMessage::new(self.msg, true)
}
}
impl Drop for MosqMessage {
fn drop(&mut self) {
if self.owned {
unsafe { mosquitto_message_free(&mut (self.msg as *mut _)) };
}
}
}
pub struct TopicMatcher<'a> {
sub: CString,
pub mid: i32,
mosq: &'a Mosquitto,
}
impl<'a> TopicMatcher<'a> {
fn new(sub: CString, mid: i32, mosq: &'a Mosquitto) -> TopicMatcher<'a> {
TopicMatcher { sub: sub, mid: mid, mosq: mosq }
}
pub fn matches(&self, msg: &MosqMessage) -> bool {
let mut matched = false;
unsafe {
mosquitto_topic_matches_sub(self.sub.as_ptr(), msg.msg_ref().topic, &mut matched);
}
matched
}
fn receive(&self, millis: i32, just_one: bool) -> Result<Vec<MosqMessage>> {
let t = Instant::now();
let wait = Duration::from_millis(millis as u64);
let mut mc = self.mosq.callbacks(Vec::new());
mc.on_message(|data, msg| {
if self.matches(&msg) {
data.push(MosqMessage::new(msg.msg, true));
}
});
while t.elapsed() < wait {
self.mosq.do_loop(millis)?;
if just_one && mc.data.len() > 0 {
break;
}
}
if mc.data.len() > 0 { let mut res = Vec::new();
::std::mem::swap(&mut mc.data, &mut res);
Ok(res)
} else { Err(Error::new("receive", MOSQ_ERR_TIMEOUT))
}
}
pub fn receive_many(&self, millis: i32) -> Result<Vec<MosqMessage>> {
self.receive(millis, false)
}
pub fn receive_one(&self, millis: i32) -> Result<MosqMessage> {
self.receive(millis, true).map(|mut v| v.remove(0))
}
}
pub struct Version {
pub major: u32,
pub minor: u32,
pub revision: u32,
}
impl Display for Version {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}.{}.{}", self.minor, self.major, self.revision)
}
}
pub fn version() -> Version {
let mut major: c_int = 0;
let mut minor: c_int = 0;
let mut revision: c_int = 0;
unsafe { mosquitto_lib_version(&mut major, &mut minor, &mut revision); }
Version { major: major as u32, minor: minor as u32, revision: revision as u32 }
}
pub struct Mosquitto {
mosq: *mut Mosq,
owned: bool,
}
impl Mosquitto {
pub fn new(id: &str) -> Result<Mosquitto> {
Mosquitto::new_session(id, true)
}
pub fn new_session(id: &str, clean_session: bool) -> Result<Mosquitto> {
if INSTANCES.fetch_add(1, Ordering::SeqCst) == 0 {
Error::result("mosquitto_lib_init", unsafe { mosquitto_lib_init() })?;
}
let mosq = unsafe {
mosquitto_new(cs(id).as_ptr(), clean_session, null_mut())
};
Ok(Mosquitto {
mosq: mosq,
owned: true,
})
}
pub fn callbacks<'a, T>(&'a self, data: T) -> Callbacks<'a, T> {
Callbacks::new(self, data)
}
pub fn connect(&self, host: &str, port: u32, keep_alive: u32) -> Result<()> {
Error::result("connect", unsafe {
mosquitto_connect(self.mosq, cs(host).as_ptr(), port as c_int, keep_alive as c_int)
})
}
pub fn connect_wait(&self, host: &str, port: u32, keep_alive: u32, millis: i32) -> Result<()> {
self.connect(host, port, keep_alive)?;
let t = Instant::now();
let wait = Duration::from_millis(millis as u64);
let mut callback = self.callbacks(MOSQ_CONNECT_ERR_TIMEOUT);
callback.on_connect(|data, rc| {
*data = rc;
});
loop {
self.do_loop(millis)?;
if callback.data == MOSQ_CONNECT_ERR_OK {
return Ok(());
};
if t.elapsed() > wait {
break;
}
}
Err(Error::new_connect(callback.data))
}
pub fn threaded(&self) -> Result<()> {
Error::result("mosquitto_threaded_set",
unsafe { mosquitto_threaded_set(self.mosq, true) })
}
pub fn set_option(&self, option: u32, value: i32) -> Result<()> {
Error::result("mosquitto_int_option",
unsafe { mosquitto_int_option(self.mosq, option, value) })
}
pub fn reconnect(&self) -> Result<()> {
Error::result("reconnect", unsafe {
mosquitto_reconnect(self.mosq)
})
}
pub fn reconnect_delay_set(&self, delay: u32, delay_max: u32, exponential_backoff: bool) -> Result<()> {
Error::result("delay_set", unsafe {
mosquitto_reconnect_delay_set(self.mosq,
delay as c_uint,
delay_max as c_uint,
exponential_backoff,
)
})
}
pub fn subscribe<'a>(&'a self, sub: &str, qos: u32) -> Result<TopicMatcher<'a>> {
let mut mid: c_int = 0;
let sub = cs(sub);
let rc = unsafe { mosquitto_subscribe(self.mosq, &mut mid, sub.as_ptr(), qos as c_int) };
if rc == 0 {
Ok(TopicMatcher::new(sub, mid, self))
} else {
Err(Error::new("subscribe", rc))
}
}
pub fn unsubscribe(&self, sub: &str) -> Result<i32> {
let mut mid = 0;
let rc = unsafe { mosquitto_unsubscribe(self.mosq, &mut mid, cs(sub).as_ptr()) };
if rc == 0 {
Ok(mid as i32)
} else {
Err(Error::new("unsubscribe", rc))
}
}
pub fn publish(&self, topic: &str, payload: &[u8], qos: u32, retain: bool) -> Result<i32> {
let mut mid = 0;
let rc = unsafe {
mosquitto_publish(
self.mosq, &mut mid, cs(topic).as_ptr(),
payload.len() as c_int, payload.as_ptr() as *mut c_void,
qos as c_int, retain,
)
};
if rc == 0 {
Ok(mid as i32)
} else {
Err(Error::new("publish", rc))
}
}
pub fn will_set(&self, topic: &str, payload: &[u8], qos: u32, retain: bool) -> Result<()> {
Error::result("will_set", unsafe {
mosquitto_will_set(
self.mosq, cs(topic).as_ptr(),
payload.len() as c_int, payload.as_ptr() as *mut c_void,
qos as c_int, retain,
)
})
}
pub fn will_clear(&self) -> Result<()> {
Error::result("will_clear", unsafe {
mosquitto_will_clear(self.mosq)
})
}
pub fn publish_wait(&self, topic: &str, payload: &[u8], qos: u32, retain: bool, millis: i32) -> Result<i32> {
let our_mid = self.publish(topic, payload, qos, retain)?;
let t = Instant::now();
let wait = Duration::from_millis(millis as u64);
let mut callback = self.callbacks(0);
callback.on_publish(|data, mid| {
*data = mid;
});
loop {
self.do_loop(millis)?;
if callback.data == our_mid {
return Ok(our_mid);
};
if t.elapsed() > wait {
break;
}
}
Err(Error::new("publish", MOSQ_ERR_UNKNOWN))
}
pub fn disconnect(&self) -> Result<()> {
Error::result("disconnect", unsafe {
mosquitto_disconnect(self.mosq)
})
}
pub fn do_loop(&self, timeout: i32) -> Result<()> {
Error::result("do_loop", unsafe {
mosquitto_loop(self.mosq, timeout as c_int, 1)
})
}
pub fn loop_forever(&self, timeout: i32) -> Result<()> {
Error::result("loop_forever", unsafe {
mosquitto_loop_forever(self.mosq, timeout as c_int, 1)
})
}
pub fn loop_until_disconnect(&self, timeout: i32) -> Result<()> {
if let Err(e) = self.loop_forever(timeout) {
if e.error() == sys::MOSQ_ERR_NO_CONN {
Ok(())
} else { Err(e)
}
} else {
Ok(())
}
}
pub fn tls_set<P1, P2, P3>(&self, cafile: P1, certfile: P2, keyfile: P3, passphrase: Option<&str>) -> Result<()>
where P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path> {
Error::result("tls_set", unsafe {
let callback = if let Some(passphrase) = passphrase {
PASSWORD_PTR = cs(passphrase).into_raw();
PASSWORD_SIZE = passphrase.len();
true
} else {
false
};
mosquitto_tls_set(self.mosq,
cpath(cafile.as_ref()).as_ptr(), null() as *const c_char,
cpath(certfile.as_ref()).as_ptr(), cpath(keyfile.as_ref()).as_ptr(),
if callback { Some(mosq_password_callback) } else { None },
)
})
}
pub fn tls_psk_set(&self, psk: &str, identity: &str, ciphers: Option<&str>) -> Result<()> {
Error::result("tls_psk_set", unsafe {
let cipher;
let cipher_ptr = if let Some(ciphers) = ciphers {
cipher = cs(ciphers);
cipher.as_ptr()
} else {
null() as *const c_char
};
mosquitto_tls_psk_set(self.mosq, cs(psk).as_ptr(), cs(identity).as_ptr(), cipher_ptr)
})
}
}
static mut PASSWORD_PTR: *const c_char = 0 as *const c_char;
static mut PASSWORD_SIZE: usize = 0;
use std::ptr;
unsafe extern "C" fn mosq_password_callback(buf: *mut c_char, _size: c_int, _rwflag: c_int, _userdata: *mut Data) -> c_int {
ptr::copy(PASSWORD_PTR, buf, PASSWORD_SIZE + 1);
PASSWORD_SIZE as c_int
}
unsafe impl Send for Mosquitto {}
unsafe impl Sync for Mosquitto {}
impl Clone for Mosquitto {
fn clone(&self) -> Mosquitto {
Mosquitto {
mosq: self.mosq,
owned: false,
}
}
}
impl Drop for Mosquitto {
fn drop(&mut self) {
if self.owned {
unsafe { mosquitto_destroy(self.mosq); }
if INSTANCES.fetch_sub(1, Ordering::SeqCst) == 1 {
unsafe { mosquitto_lib_init(); }
}
}
}
}
pub struct Callbacks<'a, T> {
message_callback: Option<Box<dyn Fn(&mut T, MosqMessage) + 'a>>,
connect_callback: Option<Box<dyn Fn(&mut T, i32) + 'a>>,
publish_callback: Option<Box<dyn Fn(&mut T, i32) + 'a>>,
subscribe_callback: Option<Box<dyn Fn(&mut T, i32) + 'a>>,
unsubscribe_callback: Option<Box<dyn Fn(&mut T, i32) + 'a>>,
disconnect_callback: Option<Box<dyn Fn(&mut T, i32) + 'a>>,
log_callback: Option<Box<dyn Fn(&mut T, u32, &str) + 'a>>,
mosq: &'a Mosquitto,
init: bool,
pub data: T,
}
impl<'a, T> Callbacks<'a, T> {
pub fn new(mosq: &Mosquitto, data: T) -> Callbacks<T> {
Callbacks {
message_callback: None,
connect_callback: None,
publish_callback: None,
subscribe_callback: None,
unsubscribe_callback: None,
disconnect_callback: None,
log_callback: None,
mosq: mosq,
init: false,
data: data,
}
}
pub fn mosq(&self) -> &Mosquitto {
self.mosq
}
fn initialize(&mut self) {
if !self.init {
self.init = true;
let pdata: *const Callbacks<T> = &*self;
unsafe {
mosquitto_user_data_set(self.mosq.mosq, pdata as *mut Data);
};
}
}
pub fn on_message<C: Fn(&mut T, MosqMessage) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_message_callback_set(self.mosq.mosq, Some(mosq_message_callback::<T>)); }
self.message_callback = Some(Box::new(callback));
}
pub fn on_connect<C: Fn(&mut T, i32) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_connect_callback_set(self.mosq.mosq, Some(mosq_connect_callback::<T>)); }
self.connect_callback = Some(Box::new(callback));
}
pub fn on_publish<C: Fn(&mut T, i32) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_publish_callback_set(self.mosq.mosq, Some(mosq_publish_callback::<T>)); }
self.publish_callback = Some(Box::new(callback));
}
pub fn on_subscribe<C: Fn(&mut T, i32) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_subscribe_callback_set(self.mosq.mosq, Some(mosq_subscribe_callback::<T>)); }
self.subscribe_callback = Some(Box::new(callback));
}
pub fn on_unsubscribe<C: Fn(&mut T, i32) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_unsubscribe_callback_set(self.mosq.mosq, Some(mosq_unsubscribe_callback::<T>)); }
self.unsubscribe_callback = Some(Box::new(callback));
}
pub fn on_disconnect<C: Fn(&mut T, i32) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_disconnect_callback_set(self.mosq.mosq, Some(mosq_disconnect_callback::<T>)); }
self.disconnect_callback = Some(Box::new(callback));
}
pub fn on_log<C: Fn(&mut T, u32, &str) + 'a>(&mut self, callback: C) {
self.initialize();
unsafe { mosquitto_log_callback_set(self.mosq.mosq, Some(mosq_log_callback::<T>)); }
self.log_callback = Some(Box::new(callback));
}
}
impl<'a, T> Drop for Callbacks<'a, T> {
fn drop(&mut self) {
unsafe {
mosquitto_user_data_set(self.mosq.mosq, null_mut() as *mut Data);
}
}
}
macro_rules! callback_ref {
($data:expr,$T:ident) =>
{
&mut *($data as *mut Callbacks<$T>)
}
}
unsafe extern "C" fn mosq_connect_callback<T>(_: *mut Mosq, data: *mut Data, rc: c_int) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
if let Some(ref callback) = this.connect_callback {
callback(&mut this.data, rc as i32);
}
}
unsafe extern "C" fn mosq_publish_callback<T>(_: *mut Mosq, data: *mut Data, rc: c_int) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
if let Some(ref callback) = this.publish_callback {
callback(&mut this.data, rc as i32);
}
}
unsafe extern "C" fn mosq_message_callback<T>(_: *mut Mosq, data: *mut Data, message: *const Message) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
if let Some(ref callback) = this.message_callback {
callback(&mut this.data, MosqMessage::new(message.clone(), false));
}
}
unsafe extern "C" fn mosq_subscribe_callback<T>(_: *mut Mosq, data: *mut Data, rc: c_int, _qos_count: i32, _granted_qos: *const c_int) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
if let Some(ref callback) = this.subscribe_callback {
callback(&mut this.data, rc as i32);
}
}
unsafe extern "C" fn mosq_unsubscribe_callback<T>(_: *mut Mosq, data: *mut Data, rc: c_int) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
if let Some(ref callback) = this.unsubscribe_callback {
callback(&mut this.data, rc as i32);
}
}
unsafe extern "C" fn mosq_disconnect_callback<T>(_: *mut Mosq, data: *mut Data, rc: c_int) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
if let Some(ref callback) = this.disconnect_callback {
callback(&mut this.data, rc as i32);
}
}
unsafe extern "C" fn mosq_log_callback<T>(_: *mut Mosq, data: *mut Data, level: c_int, text: *const c_char) {
if data.is_null() { return; }
let this = callback_ref!(data,T);
let text = CStr::from_ptr(text).to_str().expect("log text was not UTF-8");
if let Some(ref callback) = this.log_callback {
callback(&mut this.data, level as u32, text);
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn set_option() -> Result<()> {
let m = Mosquitto::new("test")?;
m.set_option(11, 0)
}
#[test]
fn set_threaded() -> Result<()> {
let m = Mosquitto::new("test")?;
m.threaded()
}
}