use std::ptr;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::ffi::CStr;
use std::os::raw::c_void;
use std::convert::Into;
use futures::{Future, Async};
use futures::task;
use futures::task::Task;
use futures_timer::FutureExt;
use ffi;
use async_client::{AsyncClient};
use types::ReasonCode;
use message::Message;
use server_response::{ServerRequest, ServerResponse};
use errors;
use errors::{MqttResult, MqttError};
pub type SuccessCallback = dyn Fn(&AsyncClient, u16) + 'static;
pub type FailureCallback = dyn Fn(&AsyncClient, u16, i32) + 'static;
pub type SuccessCallback5 = dyn Fn(&AsyncClient, u16) + 'static;
pub type FailureCallback5 = dyn Fn(&AsyncClient, u16, i32) + 'static;
#[derive(Debug,Default)]
pub(crate) struct TokenData {
complete: bool,
msg_id: i16,
ret_code: i32,
reason_code: ReasonCode,
err_msg: Option<String>,
srvr_rsp: ServerResponse,
task: Option<Task>,
}
impl TokenData {
pub fn from_message_id(msg_id: i16) -> TokenData {
TokenData {
msg_id,
..TokenData::default()
}
}
pub fn from_error(rc: i32) -> TokenData {
TokenData {
complete: true,
ret_code: rc,
err_msg: if rc != 0 {
Some(String::from(errors::error_message(rc)))
}
else { None },
..TokenData::default()
}
}
fn poll(&mut self) -> Result<Async<ServerResponse>, MqttError> {
let rc = self.ret_code;
if !self.complete {
self.task = Some(task::current());
Ok(Async::NotReady)
}
else if rc == 0 {
Ok(Async::Ready(self.srvr_rsp.clone()))
}
else {
if let Some(ref err_msg) = self.err_msg {
Err(MqttError::from((rc, err_msg.clone())))
}
else {
Err(MqttError::from(rc))
}
}
}
}
pub(crate) struct TokenInner {
lock: Mutex<TokenData>,
cli: Option<AsyncClient>,
req: ServerRequest,
on_success: Option<Box<SuccessCallback>>,
on_failure: Option<Box<FailureCallback>>,
on_success5: Option<Box<SuccessCallback5>>,
on_failure5: Option<Box<FailureCallback5>>,
}
impl TokenInner {
pub fn new() -> Arc<TokenInner> {
Arc::new(TokenInner::default())
}
pub fn from_request(req: ServerRequest) -> Arc<TokenInner> {
Arc::new(
TokenInner {
req,
..TokenInner::default()
}
)
}
pub fn from_message(msg: &Message) -> Arc<TokenInner> {
Arc::new(
TokenInner {
lock: Mutex::new(TokenData::from_message_id(msg.cmsg.msgid as i16)),
..TokenInner::default()
}
)
}
pub fn from_client<FS,FF>(cli: &AsyncClient,
req: ServerRequest,
success_cb: FS,
failure_cb: FF) -> Arc<TokenInner>
where FS: Fn(&AsyncClient,u16) + 'static,
FF: Fn(&AsyncClient,u16,i32) + 'static
{
Arc::new(
TokenInner {
cli: Some(cli.clone()),
req,
on_success: Some(Box::new(success_cb)),
on_failure: Some(Box::new(failure_cb)),
..TokenInner::default()
}
)
}
pub fn from_error(rc: i32) -> Arc<TokenInner> {
Arc::new(
TokenInner {
lock: Mutex::new(TokenData::from_error(rc)),
..TokenInner::default()
}
)
}
pub(crate) unsafe extern "C" fn on_success(context: *mut c_void, rsp: *mut ffi::MQTTAsync_successData) {
debug!("Token success! {:?}, {:?}", context, rsp);
if context.is_null() { return }
let tok = Token::from_raw(context);
let msgid = if !rsp.is_null() { (*rsp).token as u16 } else { 0 };
tok.inner.on_complete(msgid, 0, None, rsp);
}
pub(crate) unsafe extern "C" fn on_failure(context: *mut c_void, rsp: *mut ffi::MQTTAsync_failureData) {
warn!("Token failure! {:?}, {:?}", context, rsp);
if context.is_null() { return }
let tok = Token::from_raw(context);
let mut msgid = 0;
let mut rc = -1;
let mut err_msg = None;
if let Some(rsp) = rsp.as_ref() {
msgid = rsp.token as u16;
rc = if rsp.code == 0 { -1 } else { rsp.code as i32 };
if !rsp.message.is_null() {
if let Ok(cmsg) = CStr::from_ptr(rsp.message).to_str() {
debug!("Token failure message: {:?}", cmsg);
err_msg = Some(cmsg.to_string());
}
}
}
tok.inner.on_complete(msgid, rc, err_msg, ptr::null_mut());
}
pub(crate) unsafe extern "C" fn on_success5(context: *mut c_void, rsp: *mut ffi::MQTTAsync_successData5) {
debug!("Token v5 success! {:?}, {:?}", context, rsp);
if context.is_null() { return }
let tok = Token::from_raw(context);
let msgid = if !rsp.is_null() { (*rsp).token as u16 } else { 0 };
tok.inner.on_complete5(msgid, 0, None, rsp);
}
pub(crate) unsafe extern "C" fn on_failure5(context: *mut c_void, rsp: *mut ffi::MQTTAsync_failureData5) {
warn!("Token v5 failure! {:?}, {:?}", context, rsp);
if context.is_null() { return }
let tok = Token::from_raw(context);
let mut msgid = 0;
let mut rc = -1;
let mut reason_code = ReasonCode::default();
let mut err_msg = None;
if let Some(rsp) = rsp.as_ref() {
msgid = rsp.token as u16;
rc = if rsp.code == 0 { -1 } else { rsp.code as i32 };
reason_code = ReasonCode::from_code(rsp.reasonCode);
if !rsp.message.is_null() {
if let Ok(cmsg) = CStr::from_ptr(rsp.message).to_str() {
debug!("Token failure message: {:?}", cmsg);
err_msg = Some(cmsg.to_string());
}
}
}
debug!("Token completed with code: {}", rc);
if let Some(ref cli) = tok.inner.cli {
if let Some(ref cb) = tok.inner.on_failure {
trace!("Invoking TokenInner::on_failure callback");
cb(cli, msgid, rc);
}
}
let mut data = tok.inner.lock.lock().unwrap();
data.complete = true;
data.ret_code = rc;
data.reason_code = reason_code;
data.err_msg = err_msg;
if let Some(rsp) = rsp.as_ref() {
data.srvr_rsp = ServerResponse::from_failure5(rsp);
}
if let Some(task) = data.task.as_ref() {
task.notify();
}
}
pub(crate) fn on_complete(&self, msgid: u16, rc: i32, err_msg: Option<String>,
rsp: *mut ffi::MQTTAsync_successData) {
debug!("Token completed with code: {}", rc);
if let Some(ref cli) = self.cli {
if rc == 0 {
if let Some(ref cb) = self.on_success {
trace!("Invoking TokenInner::on_success callback");
cb(cli, msgid);
}
}
else {
if let Some(ref cb) = self.on_failure {
trace!("Invoking TokenInner::on_failure callback");
cb(cli, msgid, rc);
}
}
}
let mut data = self.lock.lock().unwrap();
data.complete = true;
data.ret_code = rc;
data.err_msg = err_msg;
debug!("Expecting server response for: {:?}", self.req);
unsafe {
if let Some(rsp) = rsp.as_ref() {
data.srvr_rsp = ServerResponse::from_success(self.req, rsp);
}
}
debug!("Got response: {:?}", data.srvr_rsp);
if let Some(task) = data.task.as_ref() {
task.notify();
}
}
pub(crate) fn on_complete5(&self, msgid: u16, rc: i32, err_msg: Option<String>,
rsp: *mut ffi::MQTTAsync_successData5) {
debug!("Token completed with code: {}", rc);
if let Some(ref cli) = self.cli {
if rc == 0 {
if let Some(ref cb) = self.on_success {
trace!("Invoking TokenInner::on_success callback");
cb(cli, msgid);
}
}
else {
if let Some(ref cb) = self.on_failure {
trace!("Invoking TokenInner::on_failure callback");
cb(cli, msgid, rc);
}
}
}
let mut data = self.lock.lock().unwrap();
data.complete = true;
data.ret_code = rc;
data.err_msg = err_msg;
debug!("Expecting server response for: {:?}", self.req);
unsafe {
if let Some(rsp) = rsp.as_ref() {
data.srvr_rsp = ServerResponse::from_success5(self.req, rsp);
}
}
debug!("Got response: {:?}", data.srvr_rsp);
if let Some(task) = data.task.as_ref() {
task.notify();
}
}
}
impl Default for TokenInner {
fn default() -> Self {
TokenInner {
lock: Mutex::new(TokenData::default()),
cli: None,
req: ServerRequest::None,
on_success: None,
on_failure: None,
on_success5: None,
on_failure5: None,
}
}
}
#[derive(Clone)]
pub struct Token {
pub(crate) inner: Arc<TokenInner>,
}
impl Token {
pub fn new() -> Token {
Token { inner: TokenInner::new() }
}
pub fn from_request(req: ServerRequest) -> Token {
Token { inner: TokenInner::from_request(req) }
}
pub fn from_client<FS,FF>(cli: &AsyncClient,
req: ServerRequest,
success_cb: FS,
failure_cb: FF) -> Token
where FS: Fn(&AsyncClient,u16) + 'static,
FF: Fn(&AsyncClient,u16,i32) + 'static
{
Token { inner: TokenInner::from_client(cli, req, success_cb, failure_cb) }
}
pub fn from_error(rc: i32) -> Token {
Token { inner: TokenInner::from_error(rc) }
}
pub fn from_success() -> Token {
Token { inner: TokenInner::from_error(ffi::MQTTASYNC_SUCCESS as i32) }
}
pub(crate) unsafe fn from_raw(ptr: *mut c_void) -> Token {
Token { inner: Arc::from_raw(ptr as *mut TokenInner) }
}
pub(crate) fn into_raw(self) -> *mut c_void {
Arc::into_raw(self.inner) as *mut c_void
}
pub fn wait_for(self, dur: Duration) -> MqttResult<ServerResponse> {
self.timeout(dur).wait()
}
}
unsafe impl Send for Token {}
impl Future for Token {
type Item = ServerResponse;
type Error = MqttError;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
let mut data = self.inner.lock.lock().unwrap();
let rc = data.ret_code;
if !data.complete {
data.task = Some(task::current());
Ok(Async::NotReady)
}
else if rc == 0 {
Ok(Async::Ready(data.srvr_rsp.clone()))
}
else if let Some(ref err_msg) = data.err_msg {
Err(MqttError::from((rc, err_msg.clone())))
}
else {
Err(MqttError::from(rc))
}
}
}
pub type ConnectToken = Token;
pub type SubscribeToken = Token;
pub type SubscribeManyToken = Token;
pub type UnsubscribeToken = Token;
pub type UnsubscribeManyToken = Token;
#[derive(Clone)]
pub struct DeliveryToken {
pub(crate) inner: Arc<TokenInner>,
msg: Message,
}
impl DeliveryToken {
pub fn new(msg: Message) -> DeliveryToken {
DeliveryToken {
inner: TokenInner::from_message(&msg),
msg,
}
}
pub fn from_error(msg: Message, rc: i32) -> DeliveryToken {
DeliveryToken {
inner: TokenInner::from_error(rc),
msg,
}
}
pub(crate) fn set_msgid(&self, msg_id: i16) {
let mut data = self.inner.lock.lock().unwrap();
data.msg_id = msg_id;
}
pub fn message(&self) -> &Message {
&self.msg
}
pub fn wait_for(self, dur: Duration) -> MqttResult<()> {
self.timeout(dur).wait()
}
}
unsafe impl Send for DeliveryToken {}
impl Into<Message> for DeliveryToken {
fn into(self) -> Message { self.msg }
}
impl Into<Token> for DeliveryToken {
fn into(self) -> Token {
Token { inner: self.inner }
}
}
impl Future for DeliveryToken {
type Item = ();
type Error = MqttError;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
let mut data = self.inner.lock.lock().unwrap();
let rc = data.ret_code;
if !data.complete {
data.task = Some(task::current());
Ok(Async::NotReady)
}
else if rc == 0 {
Ok(Async::Ready(()))
}
else if let Some(ref err_msg) = data.err_msg {
Err(MqttError::from((rc, err_msg.clone())))
}
else {
Err(MqttError::from(rc))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_new() {
let tok = Token::new();
let data = tok.inner.lock.lock().unwrap();
assert!(!data.complete);
}
#[test]
fn test_from_message() {
const MSG_ID: i16 = 42;
let mut msg = Message::new("hello", "Hi there", 1);
msg.cmsg.msgid = MSG_ID as i32;
let tok = DeliveryToken::new(msg);
let data = tok.inner.lock.lock().unwrap();
assert!(!data.complete);
assert_eq!(MSG_ID, data.msg_id);
}
#[test]
fn test_from_error() {
const ERR_CODE: i32 = -42;
let tok = Token::from_error(ERR_CODE);
let data = tok.inner.lock.lock().unwrap();
assert!(data.complete);
assert_eq!(ERR_CODE, data.ret_code);
}
#[test]
fn test_token_clones() {
let tok1 = Token::new();
let tok2 = tok1.clone();
let p1 = Token::into_raw(tok1);
let p2 = Token::into_raw(tok2);
assert_eq!(p1, p2);
unsafe {
let _ = Token::from_raw(p1);
let _ = Token::from_raw(p2);
}
}
#[test]
fn test_token_send() {
let tok = Token::new();
let tok2 = tok.clone();
let thr = thread::spawn(move || {
tok.wait()
});
tok2.inner.on_complete(0, 0, None, ptr::null_mut());
let _ = thr.join().unwrap();
}
}