use std::{borrow::Cow, convert::From, ffi::CString, fmt, os::raw::c_void, pin::Pin, slice};
use crate::{ffi, properties::Properties, to_c_bool};
#[derive(Debug)]
pub struct Message {
pub(crate) cmsg: ffi::MQTTAsync_message,
pub(crate) data: Pin<Box<MessageData>>,
}
#[derive(Debug, Default, Clone)]
pub(crate) struct MessageData {
pub(crate) topic: CString,
pub(crate) payload: Vec<u8>,
pub(crate) props: Properties,
}
impl MessageData {
pub(crate) fn new<S, V>(topic: S, payload: V) -> Self
where
S: Into<String>,
V: Into<Vec<u8>>,
{
Self {
topic: CString::new(topic.into()).unwrap(),
payload: payload.into(),
props: Properties::default(),
}
}
}
impl Message {
pub fn new<S, V>(topic: S, payload: V, qos: i32) -> Self
where
S: Into<String>,
V: Into<Vec<u8>>,
{
let cmsg = ffi::MQTTAsync_message {
qos,
..ffi::MQTTAsync_message::default()
};
let data = MessageData::new(topic, payload);
Self::from_data(cmsg, data)
}
pub fn new_retained<S, V>(topic: S, payload: V, qos: i32) -> Self
where
S: Into<String>,
V: Into<Vec<u8>>,
{
let cmsg = ffi::MQTTAsync_message {
qos,
retained: 1,
..ffi::MQTTAsync_message::default()
};
let data = MessageData::new(topic, payload);
Self::from_data(cmsg, data)
}
fn from_data(mut cmsg: ffi::MQTTAsync_message, data: MessageData) -> Self {
let data = Box::pin(data);
cmsg.payload = data.payload.as_ptr() as *const _ as *mut c_void;
cmsg.payloadlen = data.payload.len() as i32;
cmsg.properties = data.props.cprops;
Self { cmsg, data }
}
pub fn from_c_parts(topic: CString, cmsg: &ffi::MQTTAsync_message) -> Self {
let len = cmsg.payloadlen as usize;
let payload = if cmsg.payload.is_null() {
Vec::new()
}
else {
unsafe { slice::from_raw_parts(cmsg.payload as *mut u8, len) }.to_vec()
};
let data = MessageData {
topic,
payload,
props: Properties::from_c_struct(&cmsg.properties),
};
Self::from_data(*cmsg, data)
}
pub fn topic(&self) -> &str {
self.data
.topic
.to_str()
.expect("paho.mqtt.c already validated utf8")
}
pub fn payload(&self) -> &[u8] {
self.data.payload.as_slice()
}
pub fn payload_str(&self) -> Cow<'_, str> {
String::from_utf8_lossy(&self.data.payload)
}
pub fn qos(&self) -> i32 {
self.cmsg.qos
}
pub fn retained(&self) -> bool {
self.cmsg.retained != 0
}
pub fn properties(&self) -> &Properties {
&self.data.props
}
}
impl Default for Message {
fn default() -> Message {
Self::from_data(ffi::MQTTAsync_message::default(), MessageData::default())
}
}
impl Clone for Message {
fn clone(&self) -> Self {
Self::from_data(self.cmsg, (*self.data).clone())
}
}
unsafe impl Send for Message {}
unsafe impl Sync for Message {}
impl<'a, 'b> From<(&'a str, &'b [u8])> for Message {
fn from((topic, payload): (&'a str, &'b [u8])) -> Self {
Self::from_data(
ffi::MQTTAsync_message::default(),
MessageData::new(topic, payload),
)
}
}
impl<'a, 'b> From<(&'a str, &'b [u8], i32, bool)> for Message {
fn from((topic, payload, qos, retained): (&'a str, &'b [u8], i32, bool)) -> Self {
let cmsg = ffi::MQTTAsync_message {
qos,
retained: to_c_bool(retained),
..ffi::MQTTAsync_message::default()
};
Self::from_data(cmsg, MessageData::new(topic, payload))
}
}
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let topic = match self.data.topic.as_c_str().to_str() {
Ok(s) => s,
Err(_) => return Err(fmt::Error),
};
let payload = self.payload_str();
write!(f, "{}: {}", topic, payload)
}
}
#[derive(Debug)]
pub struct MessageBuilder {
topic: String,
payload: Vec<u8>,
qos: i32,
retained: bool,
props: Properties,
}
impl MessageBuilder {
pub fn new() -> Self {
Self {
topic: String::new(),
payload: Vec::new(),
qos: 0,
retained: false,
props: Properties::default(),
}
}
pub fn topic<T>(mut self, topic: T) -> Self
where
T: Into<String>,
{
self.topic = topic.into();
self
}
pub fn payload<V>(mut self, payload: V) -> Self
where
V: Into<Vec<u8>>,
{
self.payload = payload.into();
self
}
pub fn qos(mut self, qos: i32) -> Self {
self.qos = qos;
self
}
pub fn retained(mut self, retained: bool) -> Self {
self.retained = retained;
self
}
pub fn properties(mut self, props: Properties) -> Self {
self.props = props;
self
}
pub fn finalize(self) -> Message {
let cmsg = ffi::MQTTAsync_message {
qos: self.qos,
retained: to_c_bool(self.retained),
..ffi::MQTTAsync_message::default()
};
let data = MessageData {
topic: CString::new(self.topic).unwrap(),
payload: self.payload,
props: self.props,
};
Message::from_data(cmsg, data)
}
}
impl Default for MessageBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{os::raw::c_char, thread};
const STRUCT_ID: [c_char; 4] = [
b'M' as c_char,
b'Q' as c_char,
b'T' as c_char,
b'M' as c_char,
];
const STRUCT_VERSION: i32 = ffi::MESSAGE_STRUCT_VERSION;
const TOPIC: &str = "test";
const PAYLOAD: &[u8] = b"Hello world";
const QOS: i32 = 2;
const RETAINED: bool = true;
#[test]
fn test_default() {}
#[test]
fn test_new() {
let msg = Message::new(TOPIC, PAYLOAD, QOS);
assert_eq!(STRUCT_ID, msg.cmsg.struct_id);
assert_eq!(STRUCT_VERSION, msg.cmsg.struct_version);
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(msg.data.payload.len() as i32, msg.cmsg.payloadlen);
assert_eq!(msg.data.payload.as_ptr() as *mut c_void, msg.cmsg.payload);
assert_eq!(QOS, msg.cmsg.qos);
assert!(msg.cmsg.retained == 0);
}
#[test]
fn test_from_2_tuple() {
let msg = Message::from((TOPIC, PAYLOAD));
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(msg.data.payload.len() as i32, msg.cmsg.payloadlen);
assert_eq!(msg.data.payload.as_ptr() as *mut c_void, msg.cmsg.payload);
}
#[test]
fn test_from_4_tuple() {
let msg = Message::from((TOPIC, PAYLOAD, QOS, RETAINED));
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(msg.data.payload.len() as i32, msg.cmsg.payloadlen);
assert_eq!(msg.data.payload.as_ptr() as *mut c_void, msg.cmsg.payload);
assert_eq!(QOS, msg.cmsg.qos);
assert!(msg.cmsg.retained != 0);
}
#[test]
fn test_builder_default() {
let msg = MessageBuilder::new().finalize();
let cmsg = ffi::MQTTAsync_message::default();
assert_eq!(STRUCT_ID, cmsg.struct_id);
assert_eq!(STRUCT_VERSION, cmsg.struct_version);
assert_eq!(cmsg.struct_id, msg.cmsg.struct_id);
assert_eq!(cmsg.struct_version, msg.cmsg.struct_version);
assert_eq!(0, msg.data.topic.as_bytes().len());
assert_eq!(&[] as &[u8], msg.data.topic.as_bytes());
assert_eq!(&[] as &[u8], msg.data.payload.as_slice());
}
#[test]
fn test_builder_topic() {
const TOPIC: &str = "test";
let msg = MessageBuilder::new().topic(TOPIC).finalize();
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(TOPIC, msg.topic());
}
#[test]
fn test_builder_payload() {
const PAYLOAD: &[u8] = b"Hello world";
let msg = MessageBuilder::new().payload(PAYLOAD).finalize();
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(PAYLOAD, msg.payload());
assert_eq!(msg.data.payload.len() as i32, msg.cmsg.payloadlen);
assert_eq!(msg.data.payload.as_ptr() as *mut c_void, msg.cmsg.payload);
}
#[test]
fn test_builder_qos() {
const QOS: i32 = 2;
let msg = MessageBuilder::new().qos(QOS).finalize();
assert_eq!(QOS, msg.cmsg.qos);
assert_eq!(QOS, msg.qos());
}
#[test]
fn test_builder_retained() {
let msg = MessageBuilder::new().retained(false).finalize();
assert!(msg.cmsg.retained == 0);
let msg = MessageBuilder::new().retained(true).finalize();
assert!(msg.cmsg.retained != 0);
assert!(msg.retained());
}
#[test]
fn test_assign() {
let org_msg = MessageBuilder::new()
.topic(TOPIC)
.payload(PAYLOAD)
.qos(QOS)
.retained(RETAINED)
.finalize();
let msg = org_msg;
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(msg.data.payload.len() as i32, msg.cmsg.payloadlen);
assert_eq!(msg.data.payload.as_ptr() as *mut c_void, msg.cmsg.payload);
assert_eq!(QOS, msg.cmsg.qos);
assert!(msg.cmsg.retained != 0);
}
#[test]
fn test_clone() {
const TOPIC: &str = "test";
const PAYLOAD: &[u8] = b"Hello world";
const QOS: i32 = 2;
const RETAINED: bool = true;
let msg = {
let org_msg = MessageBuilder::new()
.topic(TOPIC)
.payload(PAYLOAD)
.qos(QOS)
.retained(RETAINED)
.finalize();
org_msg.clone()
};
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(msg.data.payload.len() as i32, msg.cmsg.payloadlen);
assert_eq!(msg.data.payload.as_ptr() as *mut c_void, msg.cmsg.payload);
assert_eq!(QOS, msg.cmsg.qos);
assert!(msg.cmsg.retained != 0);
}
#[test]
fn test_message_send() {
let msg = Message::new(TOPIC, PAYLOAD, QOS);
let thr = thread::spawn(move || {
assert_eq!(TOPIC, msg.data.topic.to_str().unwrap());
assert_eq!(PAYLOAD, msg.data.payload.as_slice());
assert_eq!(QOS, msg.qos());
});
let _ = thr.join().unwrap();
}
}