paho-mqtt 0.12.0

The Official Eclipse Paho MQTT Rust Client Library. This is a wrapper around the Paho C library.
Documentation
// paho-mqtt/src/async_client.rs
//
// This file is part of the Eclipse Paho MQTT Rust Client library.

/*******************************************************************************
 * Copyright (c) 2017-2023 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

//! The Asynchronous client module for the Paho MQTT Rust client library.
//!
//! This presents an asynchronous API that is similar to the other Paho MQTT
//! clients, but uses Token objects that implement the Futures trait, so
//! can be used in much more flexible ways than the other language clients.
//!
//! Asynchronous operations return a `Token` that is a type of future. It
//! can be used to determine if an operation has completed, block and wait
//! for the operation to complete, and obtain the final result.
//! For example, you can start a connection, do something else, and then
//! wait for the connection to complete.
//!
//! ```
//! use futures::future::Future;
//! use paho_mqtt as mqtt;
//!
//! let cli = mqtt::AsyncClient::new("tcp://localhost:1883").unwrap();
//!
//! // Start an async operation and get the token for it.
//! let tok = cli.connect(mqtt::ConnectOptions::new());
//!
//! // ...do something else...
//!
//! // Wait for the async operation to complete.
//! tok.wait().unwrap();
//! ```

use crate::{
    client_persistence::UserPersistence,
    connect_options::ConnectOptions,
    create_options::{CreateOptions, PersistenceType},
    disconnect_options::{DisconnectOptions, DisconnectOptionsBuilder},
    errors::{self, Error, Result},
    ffi,
    message::Message,
    properties::Properties,
    reason_code::ReasonCode,
    response_options::{ResponseOptions, ResponseOptionsBuilder},
    server_response::ServerRequest,
    string_collection::StringCollection,
    subscribe_options::SubscribeOptions,
    token::{ConnectToken, DeliveryToken, SubscribeManyToken, SubscribeToken, Token},
    types::*,
    AsyncReceiver, Receiver, UserData,
};
use crossbeam_channel as channel;
use std::{
    ffi::{CStr, CString},
    os::raw::{c_char, c_int, c_void},
    ptr, slice, str,
    sync::{Arc, Mutex, atomic::{AtomicU32, Ordering}},
    time::Duration,
};

/////////////////////////////////////////////////////////////////////////////
// AsynClient

/// An asynchronous MQTT connection client.
#[derive(Clone)]
pub struct AsyncClient {
    pub(crate) inner: Arc<InnerAsyncClient>,
}

/// Implementation details for the asynchronous MQTT connection client.
pub(crate) struct InnerAsyncClient {
    // The handle to the Paho C client
    handle: ffi::MQTTAsync,
    // The MQTT version of the connection
    mqtt_version: AtomicU32,
    // The options for connecting to the broker
    opts: Mutex<ConnectOptions>,
    // The context to give to the C callbacks
    callback_context: Mutex<CallbackContext>,
    // The server URI
    server_uri: CString,
    // The MQTT client ID name
    client_id: CString,
    // The user persistence (if any)
    user_persistence: Option<Box<UserPersistence>>,
    // Arbitrary, user-supplied data
    user_data: Option<UserData>,
}

/// User callback type for when the client is connected.
pub type ConnectedCallback = dyn FnMut(&AsyncClient) + Send + 'static;

/// User callback type for when the connection is lost from the broker.
pub type ConnectionLostCallback = dyn FnMut(&AsyncClient) + Send + 'static;

/// User callback type for when the client receives a disconnect packet.
pub type DisconnectedCallback = dyn FnMut(&AsyncClient, Properties, ReasonCode) + Send + 'static;

/// User callback signature for when subscribed messages are received.
pub type MessageArrivedCallback = dyn FnMut(&AsyncClient, Option<Message>) + Send + 'static;

// The context provided for the client callbacks.
//
// Originally these needed to be kept together and managed with a single
// context in the C lib. Now, we just keep them together to easily manage
// for thread-protection with a Mutex.
// These are now independent, so don't need to be kept inside a single mutex.
// Even better, it would be nice to be able to run the callbacks lock-free.
#[derive(Default)]
struct CallbackContext {
    /// Callback for when the client successfully connects.
    on_connected: Option<Box<ConnectedCallback>>,
    /// Callback for when the client loses connection to the server.
    on_connection_lost: Option<Box<ConnectionLostCallback>>,
    /// Callback for when the client receives a disconnect packet.
    on_disconnected: Option<Box<DisconnectedCallback>>,
    /// Callback for when a message arrives from the server.
    on_message_arrived: Option<Box<MessageArrivedCallback>>,
}

impl AsyncClient {
    /// Creates a new MQTT client which can connect to an MQTT broker.
    ///
    /// # Arguments
    ///
    /// `opts` The create options for the client.
    ///
    pub fn new<T>(opts: T) -> Result<AsyncClient>
    where
        T: Into<CreateOptions>,
    {
        let mut opts = opts.into();
        debug!("Create options: {:?}", opts);

        let mut cli = InnerAsyncClient {
            handle: ptr::null_mut(),
            mqtt_version: AtomicU32::new(MQTT_VERSION_DEFAULT),
            opts: Mutex::new(ConnectOptions::new()),
            callback_context: Mutex::new(CallbackContext::default()),
            server_uri: CString::new(opts.server_uri)?,
            client_id: CString::new(opts.client_id)?,
            user_persistence: None,
            user_data: opts.user_data,
        };

        // We might need this for file persistence path
        let file_path;

        let (ptype, pptr) = match opts.persistence {
            PersistenceType::None => (ffi::MQTTCLIENT_PERSISTENCE_NONE, ptr::null_mut()),
            PersistenceType::File => (ffi::MQTTCLIENT_PERSISTENCE_DEFAULT, ptr::null_mut()),
            PersistenceType::FilePath(path) => {
                let s = path.to_str().ok_or(errors::PersistenceError)?;
                file_path = CString::new(s).unwrap_or_default();
                let pptr = file_path.as_ptr() as *mut c_void;
                (ffi::MQTTCLIENT_PERSISTENCE_DEFAULT, pptr)
            }
            PersistenceType::User(cli_persist) => {
                let mut user_persistence = Box::new(UserPersistence::new(cli_persist));
                let pptr = &mut user_persistence.copts as *mut _ as *mut c_void;
                cli.user_persistence = Some(user_persistence);
                (ffi::MQTTCLIENT_PERSISTENCE_USER, pptr)
            }
        };

        debug!("Creating client with persistence: {}", ptype);

        let rc = unsafe {
            ffi::MQTTAsync_createWithOptions(
                &mut cli.handle as *mut *mut c_void,
                cli.server_uri.as_ptr(),
                cli.client_id.as_ptr(),
                ptype as c_int,
                pptr,
                &mut opts.copts,
            ) as i32
        };

        if rc != 0 {
            warn!("Create result: {}", rc);
            return Err(rc.into());
        }

        debug!("AsyncClient handle: {:?}", cli.handle);
        Ok(AsyncClient {
            inner: Arc::new(cli),
        })
    }

    /// Constructs a client from a raw pointer to the inner structure.
    /// This is how the client is normally reconstructed from a context
    /// pointer coming back from the C lib.
    pub(crate) unsafe fn from_raw(ptr: *mut c_void) -> AsyncClient {
        AsyncClient {
            inner: Arc::from_raw(ptr as *mut InnerAsyncClient),
        }
    }

    /// Consumes the client, returning the inner wrapped value.
    /// This is how a client can be passed to the C lib as a context pointer.
    pub(crate) fn into_raw(self) -> *mut c_void {
        Arc::into_raw(self.inner) as *mut c_void
    }

    // Low-level callback from the C library when the client is connected.
    // We just pass the call on to the handler registered with the client, if any.
    unsafe extern "C" fn on_connected(context: *mut c_void, _cause: *mut c_char) {
        debug!("Connected! {:?}", context);

        if !context.is_null() {
            let cli = AsyncClient::from_raw(context);

            if let Some(ref mut cb) = cli.inner.callback_context.lock().unwrap().on_connected {
                trace!("Invoking connected callback");
                cb(&cli);
            }

            let _ = cli.into_raw();
        }
    }

    // Low-level callback from the C library when the connection is lost.
    // We pass the call on to the handler registered with the client, if any.
    unsafe extern "C" fn on_connection_lost(context: *mut c_void, _cause: *mut c_char) {
        warn!("Connection lost. Context: {:?}", context);

        if !context.is_null() {
            let cli = AsyncClient::from_raw(context);
            {
                let mut cbctx = cli.inner.callback_context.lock().unwrap();

                // Push a None into the message stream to cleanly
                // shutdown any consumers.
                if let Some(ref mut cb) = cbctx.on_message_arrived {
                    trace!("Invoking message callback with None");
                    cb(&cli, None);
                }

                if let Some(ref mut cb) = cbctx.on_connection_lost {
                    trace!("Invoking connection lost callback");
                    cb(&cli);
                }
            }
            let _ = cli.into_raw();
        }
    }

    // Low-level callback from the C library for when a disconnect packet arrives.
    unsafe extern "C" fn on_disconnected(
        context: *mut c_void,
        cprops: *mut ffi::MQTTProperties,
        reason: ffi::MQTTReasonCodes,
    ) {
        debug!(
            "Disconnected on context {:?}, with reason code: {}",
            context, reason
        );

        if !context.is_null() {
            let cli = AsyncClient::from_raw(context);
            if let Some(ref mut cb) = cli.inner.callback_context.lock().unwrap().on_disconnected {
                let reason_code = ReasonCode::from(reason);
                let props = Properties::from_c_struct(&*cprops);
                trace!("Invoking disconnected callback");
                cb(&cli, props, reason_code);
            }
            let _ = cli.into_raw();
        }
    }

    // Low-level callback from the C library when a message arrives from the broker.
    // We pass the call on to the handler registered with the client, if any.
    unsafe extern "C" fn on_message_arrived(
        context: *mut c_void,
        topic_name: *mut c_char,
        topic_len: c_int,
        mut cmsg: *mut ffi::MQTTAsync_message,
    ) -> c_int {
        debug!(
            "Message arrived. Context: {:?}, topic: {:?} len {:?} cmsg: {:?}: {:?}",
            context, topic_name, topic_len, cmsg, *cmsg
        );

        if !context.is_null() {
            let cli = AsyncClient::from_raw(context);

            if let Some(ref mut cb) = cli
                .inner
                .callback_context
                .lock()
                .unwrap()
                .on_message_arrived
            {
                let len = topic_len as usize;
                let topic = if len == 0 {
                    // Zero-len topic means it's a NUL-terminated C string
                    CStr::from_ptr(topic_name).to_owned()
                }
                else {
                    // If we get a len for the topic, then there's no NUL terminator.
                    // TODO: Handle UTF-8 error(s)
                    let tp =
                        str::from_utf8(slice::from_raw_parts(topic_name as *mut u8, len)).unwrap();
                    CString::new(tp).unwrap()
                };
                let msg = Message::from_c_parts(topic, &*cmsg);

                trace!("Invoking message callback");
                cb(&cli, Some(msg));
            }

            let _ = cli.into_raw();
        }

        ffi::MQTTAsync_freeMessage(&mut cmsg);
        ffi::MQTTAsync_free(topic_name as *mut c_void);
        1
    }

    /// Gets the most recent MQTT version for the client.
    ///
    /// This is the version of the current connection, or the most recent
    /// connection if currently disconnected. Before an initial connection
    /// is made, this will report MQTT_VERSION_DEFAULT (0).
    pub fn mqtt_version(&self) -> u32 {
        self.inner.mqtt_version.load(Ordering::SeqCst)
    }

    /// Sets the current MQTT version.
    /// This is set when a connection is requested or established.
    pub(crate) fn set_mqtt_version(&self, ver: u32) {
        trace!("Updating client MQTT version: {}", ver);
        self.inner.mqtt_version.store(ver, Ordering::SeqCst);
    }

    /// Get access to the user-defined data in the client.
    ///
    /// This returns a reference to a read/write lock around the user data so
    /// that the application can access the data, as needed from any outside
    /// thread or a callback.
    ///
    /// Note that it's up to the application to ensure that it doesn't
    /// deadlock the callback thread when accessing the user data.
    pub fn user_data(&self) -> Option<&UserData> {
        self.inner.user_data.as_ref()
    }

    /// Connects to an MQTT broker using the specified connect options.
    ///
    /// # Arguments
    ///
    /// * `opts` The connect options. This can be `None`, in which case the
    ///          default options are used.
    ///
    pub fn connect<T>(&self, opts: T) -> ConnectToken
    where
        T: Into<Option<ConnectOptions>>,
    {
        debug!("Connecting handle: {:?}", self.inner.handle);

        let mut opts = opts.into().unwrap_or_default();
        self.set_mqtt_version(opts.mqtt_version());

        let tok = Token::from_request(self, ServerRequest::Connect);
        opts.set_token(tok.clone());

        debug!("Connect options: {:?}", opts);
        let mut lkopts = self.inner.opts.lock().unwrap();
        *lkopts = opts;

        let rc = unsafe { ffi::MQTTAsync_connect(self.inner.handle, &lkopts.copts) };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(lkopts.copts.context) };
            return ConnectToken::from_error(rc);
        }

        tok
    }

    /// Connects to an MQTT broker using the specified connect options.
    ///
    /// # Arguments
    ///
    /// * `opts` The connect options
    /// * `success_cb` The callback for a successful connection.
    /// * `failure_cb` The callback for a failed connection attempt.
    pub fn connect_with_callbacks<FS, FF>(
        &self,
        mut opts: ConnectOptions,
        success_cb: FS,
        failure_cb: FF,
    ) -> ConnectToken
    where
        FS: Fn(&AsyncClient, u16) + Send + 'static,
        FF: Fn(&AsyncClient, u16, i32) + Send + 'static,
    {
        debug!("Connecting handle with callbacks: {:?}", self.inner.handle);
        self.set_mqtt_version(opts.mqtt_version());

        let tok = Token::from_client(self, ServerRequest::Connect, success_cb, failure_cb);
        opts.set_token(tok.clone());

        debug!("Connect opts: {:?}", opts);
        unsafe {
            if !opts.copts.will.is_null() {
                debug!("Will: {:?}", *(opts.copts.will));
            }
        }
        let mut lkopts = self.inner.opts.lock().unwrap();
        *lkopts = opts;

        let rc = unsafe { ffi::MQTTAsync_connect(self.inner.handle, &lkopts.copts) };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(lkopts.copts.context) };
            return ConnectToken::from_error(rc);
        }

        tok
    }

    /// Attempts to reconnect to the broker.
    /// This can only be called after a connection was initially made or
    /// attempted. It will retry with the same connect options.
    ///
    pub fn reconnect(&self) -> ConnectToken {
        let connopts = self.inner.opts.lock().unwrap().clone();
        self.connect(connopts)
    }

    /// Attempts to reconnect to the broker, using callbacks to signal
    /// completion.
    /// This can only be called after a connection was initially made or
    /// attempted. It will retry with the same connect options.
    ///
    /// # Arguments
    ///
    /// * `success_cb` The callback for a successful connection.
    /// * `failure_cb` The callback for a failed connection attempt.
    ///
    pub fn reconnect_with_callbacks<FS, FF>(&self, success_cb: FS, failure_cb: FF) -> ConnectToken
    where
        FS: Fn(&AsyncClient, u16) + Send + 'static,
        FF: Fn(&AsyncClient, u16, i32) + Send + 'static,
    {
        let connopts = self.inner.opts.lock().unwrap().clone();
        self.connect_with_callbacks(connopts, success_cb, failure_cb)
    }

    /// Disconnects from the MQTT broker.
    ///
    /// # Arguments
    ///
    /// `opt_opts` Optional disconnect options. Specifying `None` will use
    ///            default of immediate (zero timeout) disconnect.
    ///
    pub fn disconnect<T>(&self, opt_opts: T) -> Token
    where
        T: Into<Option<DisconnectOptions>>,
    {
        let mut opts = opt_opts.into().unwrap_or_default();
        debug!("Disconnecting");
        trace!("Disconnect options: {:?}", opts);

        let tok = Token::new();
        opts.set_token(tok.clone());

        let rc = unsafe { ffi::MQTTAsync_disconnect(self.inner.handle, &opts.copts) };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(opts.copts.context) };
            return Token::from_error(rc);
        }

        // Push a None into the message stream to cleanly
        // shutdown any consumers.
        if let Some(ref mut cb) = self
            .inner
            .callback_context
            .lock()
            .unwrap()
            .on_message_arrived
        {
            trace!("Invoking message callback with None");
            cb(self, None);
        }
        tok
    }

    /// Disconnect from the MQTT broker with a timeout.
    /// This will delay the disconnect for up to the specified timeout to
    /// allow in-flight messages to complete.
    /// This is the same as calling disconnect with options specifying a
    /// timeout.
    ///
    /// # Arguments
    ///
    /// `timeout` The amount of time to wait for the disconnect. This has
    ///           a resolution in milliseconds.
    ///
    pub fn disconnect_after(&self, timeout: Duration) -> Token {
        let disconn_opts = DisconnectOptionsBuilder::new().timeout(timeout).finalize();
        self.disconnect(disconn_opts)
    }

    /// Determines if this client is currently connected to an MQTT broker.
    pub fn is_connected(&self) -> bool {
        unsafe { ffi::MQTTAsync_isConnected(self.inner.handle) != 0 }
    }

    /// Sets the callback for when the connection is established with the broker.
    ///
    /// # Arguments
    ///
    /// * `cb` The callback to register with the library. This can be a
    ///     function or a closure.
    pub fn set_connected_callback<F>(&self, cb: F)
    where
        F: FnMut(&AsyncClient) + Send + 'static,
    {
        // A pointer to the inner client will serve as the callback context
        let inner: &InnerAsyncClient = &self.inner;

        // This should be protected by a mutex if we'll have a thread-safe client
        inner.callback_context.lock().unwrap().on_connected = Some(Box::new(cb));

        unsafe {
            ffi::MQTTAsync_setConnected(
                inner.handle,
                inner as *const _ as *mut c_void,
                Some(AsyncClient::on_connected),
            );
        }
    }

    /// Removes the callback for when the conection is established
    pub fn remove_connected_callback(&self) {
        self.inner.callback_context.lock().unwrap().on_connected = None;

        unsafe {
            ffi::MQTTAsync_setConnected(self.inner.handle, ptr::null_mut(), None);
        }
    }

    /// Sets the callback for when the connection is lost with the broker.
    ///
    /// # Arguments
    ///
    /// * `cb` The callback to register with the library. This can be a
    ///     function or a closure.
    pub fn set_connection_lost_callback<F>(&self, cb: F)
    where
        F: FnMut(&AsyncClient) + Send + 'static,
    {
        // A pointer to the inner client will serve as the callback context
        let inner: &InnerAsyncClient = &self.inner;

        // This should be protected by a mutex if we'll have a thread-safe client
        inner.callback_context.lock().unwrap().on_connection_lost = Some(Box::new(cb));

        unsafe {
            ffi::MQTTAsync_setConnectionLostCallback(
                inner.handle,
                inner as *const _ as *mut c_void,
                Some(AsyncClient::on_connection_lost),
            );
        }
    }

    /// Removes the callback for when the connection is lost
    pub fn remove_connection_lost_callback(&self) {
        self.inner
            .callback_context
            .lock()
            .unwrap()
            .on_connection_lost = None;

        // TODO: We should only remove the C handler if we know that
        // we're not consuming or streaming. For now, keeping it is a
        // very minor performance hit.
        /*
        unsafe {
            ffi::MQTTAsync_setConnectionLostCallback(
                self.inner.handle,
                ptr::null_mut(),
                None
            );
        }
        */
    }

    /// Sets the callback for when a disconnect message arrives from the broker.
    ///
    /// # Arguments
    ///
    /// * `cb` The callback to register with the library. This can be a
    ///     function or a closure.
    pub fn set_disconnected_callback<F>(&self, cb: F)
    where
        F: FnMut(&AsyncClient, Properties, ReasonCode) + Send + 'static,
    {
        // A pointer to the inner client will serve as the callback context
        let inner: &InnerAsyncClient = &self.inner;

        // This should be protected by a mutex if we'll have a thread-safe client
        inner.callback_context.lock().unwrap().on_disconnected = Some(Box::new(cb));

        unsafe {
            ffi::MQTTAsync_setDisconnected(
                inner.handle,
                inner as *const _ as *mut c_void,
                Some(AsyncClient::on_disconnected),
            );
        }
    }

    /// Removes the callback for when a disconnect message is received from the broker.
    pub fn remove_disconnected_callback(&self) {
        self.inner.callback_context.lock().unwrap().on_disconnected = None;

        unsafe {
            ffi::MQTTAsync_setDisconnected(self.inner.handle, ptr::null_mut(), None);
        }
    }

    /// Sets the callback for when a message arrives from the broker.
    ///
    /// # Arguments
    ///
    /// * `cb` The callback to register with the library. This can be a
    ///     function or a closure.
    ///
    pub fn set_message_callback<F>(&self, cb: F)
    where
        F: FnMut(&AsyncClient, Option<Message>) + Send + 'static,
    {
        // A pointer to the inner client will serve as the callback context
        let inner: &InnerAsyncClient = &self.inner;

        // This should be protected by a mutex if we'll have a thread-safe client
        inner.callback_context.lock().unwrap().on_message_arrived = Some(Box::new(cb));

        unsafe {
            ffi::MQTTAsync_setMessageArrivedCallback(
                self.inner.handle,
                inner as *const _ as *mut c_void,
                Some(AsyncClient::on_message_arrived),
            );
        }
    }

    /// Removes the callback for when a message arrives from the broker.
    pub fn remove_message_callback(&self) {
        self.inner
            .callback_context
            .lock()
            .unwrap()
            .on_message_arrived = None;

        unsafe {
            ffi::MQTTAsync_setMessageArrivedCallback(self.inner.handle, ptr::null_mut(), None);
        }
    }

    /// Attempts to publish a message to the MQTT broker, but returns an
    /// error immediately if there's a problem creating or queuing the
    /// message.
    ///
    /// Returns a Publish Error on failure so that the original message
    /// can be recovered and sent again.
    pub fn try_publish(&self, msg: Message) -> Result<DeliveryToken> {
        debug!("Publish: {:?}", msg);

        let ver = self.mqtt_version();
        let tok = DeliveryToken::new(msg);
        let mut rsp_opts = ResponseOptions::new(ver, tok.clone());

        let rc = unsafe {
            let msg = tok.message();
            ffi::MQTTAsync_sendMessage(
                self.inner.handle,
                msg.topic().as_ptr() as *const c_char,
                &msg.cmsg,
                &mut rsp_opts.copts,
            )
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            let msg: Message = tok.into();
            return Err(Error::Publish(rc, msg));
        }

        tok.set_msgid(rsp_opts.copts.token as i16);
        Ok(tok)
    }

    /// Publishes a message to the MQTT broker.
    ///
    /// Returns a Delivery Token to track the progress of the operation.
    ///
    pub fn publish(&self, msg: Message) -> DeliveryToken {
        match self.try_publish(msg) {
            Ok(tok) => tok,
            Err(Error::Publish(rc, msg)) => DeliveryToken::from_error(msg, rc),
            _ => panic!("Unknown publish error"),
        }
    }

    /// Subscribes to a single topic.
    ///
    /// # Arguments
    ///
    /// `topic` The topic name
    /// `qos` The quality of service requested for messages
    ///
    pub fn subscribe<S>(&self, topic: S, qos: i32) -> SubscribeToken
    where
        S: Into<String>,
    {
        let ver = self.mqtt_version();
        let tok = Token::from_request(None, ServerRequest::Subscribe);
        let mut rsp_opts = ResponseOptions::new(ver, tok.clone());
        let topic = CString::new(topic.into()).unwrap();

        debug!("Subscribe to '{:?}' @ QOS {}", topic, qos);

        let rc = unsafe {
            ffi::MQTTAsync_subscribe(self.inner.handle, topic.as_ptr(), qos, &mut rsp_opts.copts)
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return SubscribeToken::from_error(rc);
        }

        tok
    }

    /// Subscribes to a single topic with v5 options
    ///
    /// # Arguments
    ///
    /// `topic` The topic name
    /// `qos` The quality of service requested for messages
    /// `opts` Options for the subscription
    /// `props` MQTT v5 properties
    ///
    pub fn subscribe_with_options<S, T, P>(
        &self,
        topic: S,
        qos: i32,
        opts: T,
        props: P,
    ) -> SubscribeToken
    where
        S: Into<String>,
        T: Into<SubscribeOptions>,
        P: Into<Option<Properties>>,
    {
        debug_assert!(self.mqtt_version() >= ffi::MQTTVERSION_5);

        let tok = Token::from_request(None, ServerRequest::Subscribe);
        let mut rsp_opts = ResponseOptionsBuilder::new()
            .token(tok.clone())
            .subscribe_options(opts.into())
            .properties(props.into().unwrap_or_default())
            .finalize();

        let topic = CString::new(topic.into()).unwrap();

        debug!("Subscribe to '{:?}' @ QOS {}", topic, qos);

        let rc = unsafe {
            ffi::MQTTAsync_subscribe(self.inner.handle, topic.as_ptr(), qos, &mut rsp_opts.copts)
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return SubscribeToken::from_error(rc);
        }

        tok
    }

    /// Subscribes to multiple topics simultaneously.
    ///
    /// # Arguments
    ///
    /// `topics` The collection of topic names
    /// `qos` The quality of service requested for messages
    ///
    pub fn subscribe_many<T>(&self, topics: &[T], qos: &[i32]) -> SubscribeManyToken
    where
        T: AsRef<str>,
    {
        let n = topics.len();

        let ver = self.mqtt_version();
        // TOOD: Make sure topics & qos are same length (or use min)
        let tok = Token::from_request(None, ServerRequest::SubscribeMany(n));
        let mut rsp_opts = ResponseOptions::new(ver, tok.clone());
        let topics = StringCollection::new(topics);

        debug!("Subscribe to '{:?}' @ QOS {:?}", topics, qos);

        let rc = unsafe {
            ffi::MQTTAsync_subscribeMany(
                self.inner.handle,
                n as c_int,
                topics.as_c_arr_mut_ptr(),
                qos.as_ptr(),
                &mut rsp_opts.copts,
            )
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return SubscribeManyToken::from_error(rc);
        }

        tok
    }

    /// Subscribes to multiple topics simultaneously with options.
    ///
    /// # Arguments
    ///
    /// `topics` The collection of topic names
    /// `qos` The quality of service requested for messages
    /// `opts` Subscribe options (one per topic)
    /// `props` MQTT v5 properties
    ///
    pub fn subscribe_many_with_options<T, P>(
        &self,
        topics: &[T],
        qos: &[i32],
        opts: &[SubscribeOptions],
        props: P,
    ) -> SubscribeManyToken
    where
        T: AsRef<str>,
        P: Into<Option<Properties>>,
    {
        debug_assert!(self.mqtt_version() >= ffi::MQTTVERSION_5);

        let n = topics.len();
        // TOOD: Make sure topics & qos are same length (or use min)
        let tok = Token::from_request(None, ServerRequest::SubscribeMany(n));
        let mut rsp_opts = ResponseOptionsBuilder::new()
            .token(tok.clone())
            .subscribe_many_options(opts)
            .properties(props.into().unwrap_or_default())
            .finalize();

        let topics = StringCollection::new(topics);

        debug!(
            "Subscribe to '{:?}' @ QOS {:?} w/ opts: {:?}",
            topics, qos, opts
        );
        trace!("Subscribe call/response opts: {:?}", rsp_opts);

        let rc = unsafe {
            ffi::MQTTAsync_subscribeMany(
                self.inner.handle,
                n as c_int,
                topics.as_c_arr_mut_ptr(),
                qos.as_ptr(),
                &mut rsp_opts.copts,
            )
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return SubscribeManyToken::from_error(rc);
        }

        tok
    }

    /// Unsubscribes from a single topic.
    ///
    /// # Arguments
    ///
    /// `topic` The topic to unsubscribe. It must match a topic from a
    ///         previous subscribe.
    ///
    pub fn unsubscribe<S>(&self, topic: S) -> Token
    where
        S: Into<String>,
    {
        let ver = self.mqtt_version();
        let tok = Token::from_request(None, ServerRequest::Unsubscribe);
        let mut rsp_opts = ResponseOptions::new(ver, tok.clone());
        let topic = CString::new(topic.into()).unwrap();

        debug!("Unsubscribe from '{:?}'", topic);

        let rc = unsafe {
            ffi::MQTTAsync_unsubscribe(self.inner.handle, topic.as_ptr(), &mut rsp_opts.copts)
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return Token::from_error(rc);
        }

        tok
    }

    /// Unsubscribes from a single topic.
    ///
    /// # Arguments
    ///
    /// `topic` The topic to unsubscribe. It must match a topic from a
    ///         previous subscribe.
    /// `props` MQTT v5 properties for the unsubscribe.
    ///
    pub fn unsubscribe_with_options<S>(&self, topic: S, props: Properties) -> Token
    where
        S: Into<String>,
    {
        debug_assert!(self.mqtt_version() >= ffi::MQTTVERSION_5);

        let tok = Token::from_request(None, ServerRequest::Unsubscribe);
        let mut rsp_opts = ResponseOptionsBuilder::new()
            .token(tok.clone())
            .properties(props)
            .finalize();

        let topic = CString::new(topic.into()).unwrap();

        debug!("Unsubscribe from '{:?}'", topic);

        let rc = unsafe {
            ffi::MQTTAsync_unsubscribe(self.inner.handle, topic.as_ptr(), &mut rsp_opts.copts)
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return Token::from_error(rc);
        }

        tok
    }

    /// Unsubscribes from multiple topics simultaneously.
    ///
    /// # Arguments
    ///
    /// `topic` The topics to unsubscribe. Each must match a topic from a
    ///         previous subscribe.
    ///
    pub fn unsubscribe_many<T>(&self, topics: &[T]) -> Token
    where
        T: AsRef<str>,
    {
        let ver = self.mqtt_version();

        let n = topics.len();
        let tok = Token::from_request(None, ServerRequest::UnsubscribeMany(n));
        let mut rsp_opts = ResponseOptions::new(ver, tok.clone());
        let topics = StringCollection::new(topics);

        debug!("Unsubscribe from '{:?}'", topics);

        let rc = unsafe {
            ffi::MQTTAsync_unsubscribeMany(
                self.inner.handle,
                n as c_int,
                topics.as_c_arr_mut_ptr(),
                &mut rsp_opts.copts,
            )
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return Token::from_error(rc);
        }

        tok
    }

    /// Unsubscribes from multiple topics simultaneously.
    ///
    /// # Arguments
    ///
    /// `topic` The topics to unsubscribe. Each must match a topic from a
    ///         previous subscribe.
    /// `props` MQTT v5 properties for the unsubscribe.
    ///
    pub fn unsubscribe_many_with_options<T>(&self, topics: &[T], props: Properties) -> Token
    where
        T: AsRef<str>,
    {
        debug_assert!(self.mqtt_version() >= ffi::MQTTVERSION_5);

        let n = topics.len();
        let tok = Token::from_request(None, ServerRequest::UnsubscribeMany(n));
        let mut rsp_opts = ResponseOptionsBuilder::new()
            .token(tok.clone())
            .properties(props)
            .finalize();

        let topics = StringCollection::new(topics);

        debug!("Unsubscribe from '{:?}'", topics);

        let rc = unsafe {
            ffi::MQTTAsync_unsubscribeMany(
                self.inner.handle,
                n as c_int,
                topics.as_c_arr_mut_ptr(),
                &mut rsp_opts.copts,
            )
        };

        if rc != 0 {
            let _ = unsafe { Token::from_raw(rsp_opts.copts.context) };
            return Token::from_error(rc);
        }

        tok
    }

    /// Starts the client consuming messages for a blocking (non-async) app.
    ///
    /// This starts the client receiving messages and placing them into an
    /// mpsc queue. It returns the receiving-end of the queue for the
    /// application to get the messages.
    /// This can be called at any time after the client is created, but it
    /// should be called before subscribing to any topics, otherwise messages
    /// can be lost.
    //
    pub fn start_consuming(&self) -> Receiver<Option<Message>> {
        let (tx, rx) = channel::unbounded::<Option<Message>>();

        // Make sure at least the low-level connection_lost handler is in
        // place to notify us when the connection is lost (sends a 'None' to
        // the receiver).
        let inner: &InnerAsyncClient = &self.inner;

        unsafe {
            ffi::MQTTAsync_setConnectionLostCallback(
                inner.handle,
                inner as *const _ as *mut c_void,
                Some(AsyncClient::on_connection_lost),
            );
        }

        // Message callback just queues incoming messages.
        self.set_message_callback(move |_, msg| {
            if tx.send(msg).is_err() {
                error!("Consumer channel is closed.");
            }
        });

        rx
    }

    /// Stops the client from consuming messages.
    pub fn stop_consuming(&self) {
        self.remove_message_callback();
    }

    /// Creates a futures stream for consuming messages.
    ///
    /// This will install an internal callback to receive the incoming
    /// messages from the client, and return the receive side of the channel.
    /// The stream will stay open for the life of the client. If the client
    /// gets disconnected, it will insert `None` into the channel to signal
    /// the app about the disconnect.
    ///
    /// It's a best practice to open the stream _before_ connecting to the
    /// server. When using persistent (non-clean) sessions, messages could
    /// arriving as soon as the connection is made - even before the
    /// connect() call returns.
    pub fn get_stream(&mut self, buffer_sz: usize) -> AsyncReceiver<Option<Message>> {
        let (tx, rx) = async_channel::bounded(buffer_sz);

        // Make sure at least the low-level connection_lost handler is in
        // place to notify us when the connection is lost (sends a 'None' to
        // the receiver).
        let inner: &InnerAsyncClient = &self.inner;

        unsafe {
            ffi::MQTTAsync_setConnectionLostCallback(
                inner.handle,
                inner as *const _ as *mut c_void,
                Some(AsyncClient::on_connection_lost),
            );
        }

        self.set_message_callback(move |_, msg| {
            if let Err(err) = tx.try_send(msg) {
                if err.is_full() {
                    warn!("Input stream full. Losing messages");
                }
                else {
                    error!("Stream error: {:?}", err);
                }
            }
        });

        rx
    }

    /// Stops the client from streaming messages in.
    pub fn stop_stream(&self) {
        self.remove_message_callback();
    }

    /// Returns client ID used for client instance
    ///
    /// Client ID is returned as a rust String as set in a
    /// CreateOptionsBuilder for symmetry
    pub fn client_id(&self) -> String {
        self.inner.client_id.clone().into_string().unwrap()
    }
}

// The client is safe to send or share between threads.
unsafe impl Send for AsyncClient {}
unsafe impl Sync for AsyncClient {}

impl Drop for InnerAsyncClient {
    /// Drops the client by closing dpen all the underlying, dependent objects
    fn drop(&mut self) {
        // Destroy the underlying C client.
        if !self.handle.is_null() {
            unsafe {
                ffi::MQTTAsync_destroy(&mut self.handle as *mut *mut c_void);
            }
        }
    }
}

/////////////////////////////////////////////////////////////////////////////
//                              Unit Tests
/////////////////////////////////////////////////////////////////////////////

#[cfg(test)]
mod tests {
    use super::*;
    use crate::create_options::CreateOptionsBuilder;
    use std::sync::{Arc, Mutex, RwLock};
    use std::thread;

    // Makes sure than when a client is moved, the inner struct stayes at
    // the same address (on the heap) since that inner struct is used as
    // the context pointer for callbacks
    // GitHub Issue #17
    #[test]
    fn test_context() {
        let cli = AsyncClient::new("tcp://localhost:1883").unwrap();
        cli.set_message_callback(|_, _| {});

        // Get a context pointer to the inner struct
        let pctx = {
            let ctx: &InnerAsyncClient = &cli.inner;
            ctx as *const _ as *mut c_void
        };

        // Move the client, then get a context pointer to inner
        let new_cli = cli;
        let new_pctx = {
            let ctx: &InnerAsyncClient = &new_cli.inner;
            ctx as *const _ as *mut c_void
        };

        // They should match (inner didn't move)
        assert_eq!(pctx, new_pctx);
    }

    #[test]
    fn test_create() {
        let cli = AsyncClient::new("tcp://localhost:1883");
        assert!(cli.is_ok(), "Error in creating simple async client, do you have a running MQTT server on localhost:1883?");
    }

    #[test]
    fn test_with_client_id() {
        let options = CreateOptionsBuilder::new().client_id("test1").finalize();
        let client = AsyncClient::new(options);
        assert!(
            client.is_ok(),
            "Error in creating async client with client_id"
        );
        let tok = client.unwrap().connect(None);
        match tok.wait() {
            Ok(_) => (),
            Err(e) => println!("(Error) {}", e),
        }
    }

    // Test immutable user data without any lock
    #[test]
    fn test_user_data() {
        const DATA_STR: &str = "Hello world!";

        let cli = CreateOptionsBuilder::new()
            .server_uri("tcp://localhost:1883")
            .user_data(Box::new(DATA_STR))
            .create_client()
            .unwrap();

        let data = cli.user_data();

        assert!(data.is_some());
        assert_eq!(&DATA_STR, data.unwrap().downcast_ref::<&str>().unwrap());
    }

    // Test writable user data using a mutex.
    #[test]
    fn test_locked_user_data() {
        let data_vec = vec!["zero", "one", "two"];
        let data = Box::new(Mutex::new(data_vec));

        let cli = CreateOptionsBuilder::new()
            .server_uri("tcp://localhost:1883")
            .user_data(data)
            .create_client()
            .unwrap();

        let data = cli.user_data();
        assert!(data.is_some());

        let lock = data.unwrap().downcast_ref::<Mutex<Vec<&str>>>().unwrap();
        let mut v = lock.lock().unwrap();
        assert_eq!(3, v.len());
        assert_eq!("zero", v[0]);
        assert_eq!("one", v[1]);
        assert_eq!("two", v[2]);

        v.push("three");
        assert_eq!(4, v.len());
        assert_eq!("three", v[3]);
    }

    #[test]
    fn test_rw_user_data() {
        let data_vec = vec!["zero", "one", "two"];
        let data = Box::new(RwLock::new(data_vec));

        let cli = CreateOptionsBuilder::new()
            .server_uri("tcp://localhost:1883")
            .user_data(data)
            .create_client()
            .unwrap();

        let data = cli.user_data();
        assert!(data.is_some());
        let data = data.unwrap();

        let lock = data.downcast_ref::<RwLock<Vec<&str>>>().unwrap();
        // Try reading
        {
            let v = lock.read().unwrap();
            assert_eq!(3, v.len());
            assert_eq!("zero", v[0]);
            assert_eq!("one", v[1]);
            assert_eq!("two", v[2]);
        }

        // Now try writing
        {
            let mut v = lock.write().unwrap();
            v.push("three");
            assert_eq!(4, v.len());
            assert_eq!("three", v[3]);
        }
    }

    // Determine that a client can be sent across threads.
    // As long as it compiles, this indicates that AsyncClient implements
    // the Send trait.
    #[test]
    fn test_send() {
        let cli = AsyncClient::new("tcp://localhost:1883").unwrap();
        let thr = thread::spawn(move || {
            assert!(!cli.is_connected());
        });
        let _ = thr.join().unwrap();
    }

    // Determine that a client can be shared across threads using an Arc.
    // As long as it compiles, this indicates that AsyncClient implements the
    // Send trait.
    // This is a bit redundant with the previous test, but explicitly
    // addresses GitHub Issue #31.
    #[test]
    fn test_send_arc() {
        let cli = AsyncClient::new("tcp://localhost:1883").unwrap();

        let cli = Arc::new(cli);
        let cli2 = cli.clone();

        let thr = thread::spawn(move || {
            assert!(!cli.is_connected());
        });
        assert!(!cli2.is_connected());
        let _ = thr.join().unwrap();
    }

    #[test]
    fn test_get_client_id() {
        let c_id = "test_client_id_can_be_retrieved";
        let options = CreateOptionsBuilder::new().client_id(c_id).finalize();
        let client = AsyncClient::new(options);
        assert!(
            client.is_ok(),
            "Error in creating async client with client_id"
        );
        let retrieved = client.unwrap().client_id();
        assert_eq!(retrieved, c_id.to_string());
    }
}