use crate::Error;
pub(crate) use libmosquitto_sys as sys;
use std::cell::{Ref, RefCell};
use std::convert::TryInto;
use std::ffi::{CStr, CString};
use std::os::raw::{c_int, c_void};
use std::sync::Arc;
use std::sync::Once;
static INIT: Once = Once::new();
fn init_library() {
INIT.call_once(|| unsafe {
sys::mosquitto_lib_init();
});
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct LibraryVersion {
pub major: c_int,
pub minor: c_int,
pub revision: c_int,
pub version: c_int,
}
impl std::fmt::Display for LibraryVersion {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.minor, self.major, self.revision)
}
}
pub fn lib_version() -> LibraryVersion {
init_library();
let mut vers = LibraryVersion {
major: 0,
minor: 0,
revision: 0,
version: 0,
};
unsafe {
vers.version =
sys::mosquitto_lib_version(&mut vers.major, &mut vers.minor, &mut vers.revision);
}
vers
}
pub(crate) fn cstr(s: &str) -> Result<CString, Error> {
Ok(CString::new(s)?)
}
pub struct Mosq<CB = ()>
where
CB: Callbacks,
{
m: *mut sys::mosquitto,
cb: Option<Arc<CallbackWrapper<CB>>>,
}
unsafe impl<CB: Callbacks> Sync for Mosq<CB> {}
unsafe impl<CB: Callbacks> Send for Mosq<CB> {}
impl<CB: Callbacks> Mosq<CB> {
pub fn with_auto_id(callbacks: CB) -> Result<Self, Error> {
init_library();
unsafe {
let cb = Arc::new(CallbackWrapper::new(callbacks));
let m = sys::mosquitto_new(std::ptr::null(), true, Arc::as_ptr(&cb) as *mut _);
if m.is_null() {
Err(Error::Create(std::io::Error::last_os_error()))
} else {
Ok(Self::set_callbacks(Self { m, cb: Some(cb) }))
}
}
}
pub fn with_id(callbacks: CB, id: &str, clean_session: bool) -> Result<Self, Error> {
init_library();
unsafe {
let cb = Arc::new(CallbackWrapper::new(callbacks));
let m = sys::mosquitto_new(
cstr(id)?.as_ptr(),
clean_session,
Arc::as_ptr(&cb) as *mut _,
);
if m.is_null() {
Err(Error::Create(std::io::Error::last_os_error()))
} else {
Ok(Self::set_callbacks(Self { m, cb: Some(cb) }))
}
}
}
pub fn set_username_and_password(
&self,
username: Option<&str>,
password: Option<&str>,
) -> Result<(), Error> {
let user;
let pass;
let username = match username {
Some(u) => {
user = cstr(u)?;
user.as_ptr()
}
None => std::ptr::null(),
};
let password = match password {
Some(p) => {
pass = cstr(p)?;
pass.as_ptr()
}
None => std::ptr::null(),
};
let err = unsafe { sys::mosquitto_username_pw_set(self.m, username, password) };
Error::result(err, ())
}
pub fn connect(
&self,
host: &str,
port: c_int,
keep_alive_interval: std::time::Duration,
bind_address: Option<&str>,
) -> Result<(), Error> {
let host = cstr(host)?;
let ba;
let bind_address = match bind_address {
Some(b) => {
ba = cstr(b)?;
ba.as_ptr()
}
None => std::ptr::null(),
};
let err = unsafe {
sys::mosquitto_connect_bind(
self.m,
host.as_ptr(),
port,
keep_alive_interval
.as_secs()
.try_into()
.map_err(|_| Error::Mosq(sys::mosq_err_t::MOSQ_ERR_INVAL))?,
bind_address,
)
};
Error::result(err, ())
}
pub fn connect_non_blocking(
&self,
host: &str,
port: c_int,
keep_alive_interval: std::time::Duration,
bind_address: Option<&str>,
) -> Result<(), Error> {
let host = cstr(host)?;
let ba;
let bind_address = match bind_address {
Some(b) => {
ba = cstr(b)?;
ba.as_ptr()
}
None => std::ptr::null(),
};
let err = unsafe {
sys::mosquitto_connect_bind_async(
self.m,
host.as_ptr(),
port,
keep_alive_interval
.as_secs()
.try_into()
.map_err(|_| Error::Mosq(sys::mosq_err_t::MOSQ_ERR_INVAL))?,
bind_address,
)
};
Error::result(err, ())
}
pub fn reconnect(&self) -> Result<(), Error> {
Error::result(unsafe { sys::mosquitto_reconnect(self.m) }, ())
}
pub fn disconnect(&self) -> Result<(), Error> {
Error::result(unsafe { sys::mosquitto_disconnect(self.m) }, ())
}
pub fn publish(
&self,
topic: &str,
payload: &[u8],
qos: QoS,
retain: bool,
) -> Result<MessageId, Error> {
let mut mid = 0;
let err = unsafe {
sys::mosquitto_publish(
self.m,
&mut mid,
cstr(topic)?.as_ptr(),
payload
.len()
.try_into()
.map_err(|_| Error::Mosq(sys::mosq_err_t::MOSQ_ERR_PAYLOAD_SIZE))?,
payload.as_ptr() as *const _,
qos as c_int,
retain,
)
};
Error::result(err, mid)
}
pub fn subscribe(&self, pattern: &str, qos: QoS) -> Result<MessageId, Error> {
let mut mid = 0;
let err = unsafe {
sys::mosquitto_subscribe(self.m, &mut mid, cstr(pattern)?.as_ptr(), qos as _)
};
Error::result(err, mid)
}
fn set_callbacks(self) -> Self {
unsafe {
sys::mosquitto_connect_callback_set(self.m, Some(CallbackWrapper::<CB>::connect));
sys::mosquitto_disconnect_callback_set(self.m, Some(CallbackWrapper::<CB>::disconnect));
sys::mosquitto_publish_callback_set(self.m, Some(CallbackWrapper::<CB>::publish));
sys::mosquitto_subscribe_callback_set(self.m, Some(CallbackWrapper::<CB>::subscribe));
sys::mosquitto_message_callback_set(self.m, Some(CallbackWrapper::<CB>::message));
}
self
}
pub fn get_callbacks(&self) -> Ref<CB> {
self.cb
.as_ref()
.expect("get_callbacks not to be called on a transient Mosq")
.cb
.borrow()
}
pub fn loop_until_explicitly_disconnected(
&self,
timeout: std::time::Duration,
) -> Result<(), Error> {
unsafe {
let max_packets = 1;
Error::result(
sys::mosquitto_loop_forever(
self.m,
timeout
.as_millis()
.try_into()
.map_err(|_| Error::Mosq(sys::mosq_err_t::MOSQ_ERR_INVAL))?,
max_packets,
),
(),
)
}
}
pub fn start_loop_thread(&self) -> Result<(), Error> {
unsafe { Error::result(sys::mosquitto_loop_start(self.m), ()) }
}
pub fn stop_loop_thread(&self, force_cancel: bool) -> Result<(), Error> {
unsafe { Error::result(sys::mosquitto_loop_stop(self.m, force_cancel), ()) }
}
}
struct CallbackWrapper<T: Callbacks> {
cb: RefCell<T>,
}
pub struct CallbackGuard<'a, T> {
_guard: Ref<'a, Option<Box<dyn Callbacks>>>,
r: *const T,
}
impl<'a, T> std::ops::Deref for CallbackGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.r }
}
}
fn with_transient_client<F: FnOnce(&mut Mosq)>(m: *mut sys::mosquitto, func: F) {
let mut client = Mosq { m, cb: None };
func(&mut client);
std::mem::forget(client);
}
impl<T: Callbacks> CallbackWrapper<T> {
fn new(cb: T) -> Self {
Self {
cb: RefCell::new(cb),
}
}
unsafe fn resolve_self<'a>(cb: *mut c_void) -> &'a Self {
&*(cb as *const Self)
}
unsafe extern "C" fn connect(m: *mut sys::mosquitto, cb: *mut c_void, rc: c_int) {
let cb = Self::resolve_self(cb);
with_transient_client(m, |client| {
cb.cb.borrow().on_connect(client, rc);
});
}
unsafe extern "C" fn disconnect(m: *mut sys::mosquitto, cb: *mut c_void, rc: c_int) {
let cb = Self::resolve_self(cb);
with_transient_client(m, |client| {
cb.cb.borrow().on_disconnect(client, rc);
});
}
unsafe extern "C" fn publish(m: *mut sys::mosquitto, cb: *mut c_void, mid: MessageId) {
let cb = Self::resolve_self(cb);
with_transient_client(m, |client| {
cb.cb.borrow().on_publish(client, mid);
});
}
unsafe extern "C" fn subscribe(
m: *mut sys::mosquitto,
cb: *mut c_void,
mid: MessageId,
qos_count: c_int,
granted_qos: *const c_int,
) {
let cb = Self::resolve_self(cb);
with_transient_client(m, |client| {
let granted_qos = std::slice::from_raw_parts(granted_qos, qos_count as usize);
let granted_qos: Vec<QoS> = granted_qos.iter().map(QoS::from_int).collect();
cb.cb.borrow().on_subscribe(client, mid, &granted_qos);
});
}
unsafe extern "C" fn message(
m: *mut sys::mosquitto,
cb: *mut c_void,
msg: *const sys::mosquitto_message,
) {
let cb = Self::resolve_self(cb);
with_transient_client(m, |client| {
let msg = &*msg;
let topic = CStr::from_ptr(msg.topic);
let topic = topic.to_string_lossy().to_string();
cb.cb.borrow().on_message(
client,
msg.mid,
topic,
std::slice::from_raw_parts(msg.payload as *const u8, msg.payloadlen as usize),
QoS::from_int(&msg.qos),
msg.retain,
);
});
}
}
pub type MessageId = c_int;
pub trait Callbacks {
fn on_connect(&self, _client: &mut Mosq, _reason: c_int) {}
fn on_disconnect(&self, _client: &mut Mosq, _reason: c_int) {}
fn on_publish(&self, _client: &mut Mosq, _mid: MessageId) {}
fn on_subscribe(&self, _client: &mut Mosq, _mid: MessageId, _granted_qos: &[QoS]) {}
fn on_message(
&self,
_client: &mut Mosq,
_mid: MessageId,
_topic: String,
_payload: &[u8],
_qos: QoS,
_retain: bool,
) {
}
}
impl Callbacks for () {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QoS {
AtMostOnce = 0,
AtLeastOnce = 1,
ExactlyOnce = 2,
}
impl Default for QoS {
fn default() -> QoS {
QoS::AtMostOnce
}
}
impl QoS {
fn from_int(i: &c_int) -> QoS {
match i {
0 => Self::AtMostOnce,
1 => Self::AtLeastOnce,
2 => Self::ExactlyOnce,
_ => Self::ExactlyOnce,
}
}
}
impl<CB: Callbacks> Drop for Mosq<CB> {
fn drop(&mut self) {
unsafe {
sys::mosquitto_destroy(self.m);
}
}
}