#![warn(missing_docs)]
#![deny(warnings)]
use futures::{SinkExt, StreamExt};
use std::{collections::HashMap, sync::Arc};
pub use rumqttc::v5::mqttbytes::v5 as mqttbytes;
pub use rumqttc::v5::mqttbytes::QoS;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum Error
{
#[error("Already subscribed to topic {0}.")]
AlreadySubscribed(String),
#[error("Not subscribed to topic {0}.")]
NotSubscribed(String),
#[error("An internal error occured when sending a request {0}.")]
RequestChannel(#[from] futures::channel::mpsc::SendError),
#[error("Could not lock a mutex: {0}.")]
MutexPoison(String),
#[error("An error occured in the communication middleware {0}.")]
Communication(#[from] rumqttc::v5::ClientError),
#[cfg(feature = "json")]
#[error("Error during json serialization {0}")]
JsonSerialisation(#[from] serde_json::Error),
#[cfg(feature = "cbor")]
#[error("Error during cbor serialization {0}")]
CborSerialisation(#[from] ciborium::ser::Error<std::io::Error>),
}
impl<T> From<std::sync::PoisonError<std::sync::MutexGuard<'_, T>>> for Error
{
fn from(value: std::sync::PoisonError<std::sync::MutexGuard<'_, T>>) -> Self
{
Error::MutexPoison(value.to_string())
}
}
#[derive(thiserror::Error, Debug)]
enum InternalError
{
#[error("No channel for topic {0}")]
NoChannelFor(String),
#[error("An error occured in the communication middleware {0}.")]
Communication(#[from] rumqttc::v5::ConnectionError),
#[error("Error when broadcasting message {0}")]
AsyncBroadcast(String),
#[error("UTF-8 error {0}")]
Utf8Error(#[from] std::str::Utf8Error),
}
impl<T> From<async_broadcast::SendError<T>> for InternalError
{
fn from(value: async_broadcast::SendError<T>) -> Self
{
Self::AsyncBroadcast(value.to_string())
}
}
#[derive(Clone)]
pub struct Message<T>
{
pub payload: T,
pub properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
}
pub type RawMessage = Message<bytes::Bytes>;
enum Request
{
Subscribe
{
topic: String,
sender: async_broadcast::Sender<RawMessage>,
qos: QoS,
},
Publish
{
topic: String,
qos: QoS,
retain: bool,
payload: bytes::Bytes,
properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
},
}
pub struct ClientBuilder
{
mqttoptions: rumqttc::v5::MqttOptions,
cap: usize,
}
impl ClientBuilder
{
pub fn set_capacity(mut self, cap: usize) -> Self
{
self.cap = cap;
self
}
async fn handle_events(
event: std::result::Result<rumqttc::v5::Event, rumqttc::v5::ConnectionError>,
senders: &Arc<futures::lock::Mutex<HashMap<String, async_broadcast::Sender<RawMessage>>>>,
) -> std::result::Result<(), InternalError>
{
match event?
{
rumqttc::v5::Event::Incoming(packet) => match packet
{
rumqttc::v5::Incoming::Publish(pub_msg) =>
{
let topic_str = std::str::from_utf8(pub_msg.topic.as_ref())?;
let senders = senders.lock().await;
match senders.get(topic_str)
{
Some(s) =>
{
s.broadcast(RawMessage {
payload: pub_msg.payload,
properties: pub_msg.properties,
})
.await?;
}
None => Err(InternalError::NoChannelFor(topic_str.to_string()))?,
}
}
rumqttc::v5::Incoming::ConnAck(_) =>
{}
rumqttc::v5::Incoming::SubAck(_) =>
{}
rumqttc::v5::Incoming::PubAck(_) =>
{}
rumqttc::v5::Incoming::PubRel(_) =>
{}
rumqttc::v5::Incoming::PubRec(_) =>
{}
rumqttc::v5::Incoming::PubComp(_) =>
{}
rumqttc::v5::Incoming::PingResp(_) =>
{}
_ =>
{
log::error!("Incoming unhandled packet {packet:?}");
}
},
rumqttc::v5::Event::Outgoing(_) =>
{ }
}
Ok(())
}
pub fn start(self) -> (Client, impl std::future::Future<Output = ()>)
{
let (client, mut eventloop) = rumqttc::v5::AsyncClient::new(self.mqttoptions, 1000);
let (request_sender, mut request_receiver) = futures::channel::mpsc::channel(10);
let fut = async move {
let senders: Arc<futures::lock::Mutex<HashMap<String, async_broadcast::Sender<RawMessage>>>> =
Default::default();
let fut_mqtt = {
let senders = senders.clone();
async move {
loop
{
if let Err(e) = Self::handle_events(eventloop.poll().await, &senders).await
{
log::error!("An error occured when handling MQTT event: {e:?}");
}
}
}
};
let fut_req = async move {
loop
{
let r = match request_receiver.next().await
{
Some(Request::Subscribe { topic, sender, qos }) =>
{
senders.lock().await.insert(topic.clone(), sender);
client.subscribe(topic, qos).await
}
Some(Request::Publish {
topic,
qos,
retain,
payload,
properties,
}) => match properties
{
Some(properties) =>
{
client
.publish_with_properties(topic, qos, retain, payload, properties)
.await
}
None => client.publish(topic, qos, retain, payload).await,
},
None => Ok(()),
};
if let Err(e) = r
{
log::error!("An error occured when evaluating requests: {e:?}");
}
}
};
futures::join!(fut_mqtt, fut_req);
};
(
Client {
request_sender,
subscriptions: Default::default(),
},
fut,
)
}
}
#[derive(Clone)]
pub struct Client
{
subscriptions: std::sync::Arc<
futures::lock::Mutex<HashMap<String, async_broadcast::InactiveReceiver<RawMessage>>>,
>,
request_sender: futures::channel::mpsc::Sender<Request>,
}
macro_rules! _create_raw_subscription {
($request_sender: expr, $subs: ident, $topic: ident, $qos: ident) => {{
let (sender, r) = async_broadcast::broadcast::<RawMessage>(10);
$subs.insert($topic.clone(), r.clone().deactivate());
$request_sender
.send(Request::Subscribe {
topic: $topic,
sender,
qos: $qos,
})
.await?;
Ok(r)
}};
}
impl Client
{
pub fn build(node_id: impl Into<String>, hostname: impl Into<String>, port: u16)
-> ClientBuilder
{
let mut mqttoptions = rumqttc::v5::MqttOptions::new(node_id, hostname, port);
mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
ClientBuilder {
mqttoptions,
cap: 100,
}
}
pub async fn publish_raw(
mut self,
topic: impl Into<String>,
qos: QoS,
retain: bool,
payload: impl Into<bytes::Bytes>,
properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
) -> Result<()>
{
self
.request_sender
.send(Request::Publish {
topic: topic.into(),
qos,
retain,
payload: payload.into(),
properties,
})
.await?;
Ok(())
}
#[cfg(feature = "json")]
pub fn publish_json<T: serde::Serialize>(
self,
topic: impl Into<String>,
qos: QoS,
retain: bool,
payload: T,
properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
) -> Result<impl std::future::Future<Output = Result<()>>>
{
let properties = match properties
{
None => rumqttc::v5::mqttbytes::v5::PublishProperties {
content_type: Some("application/json").map(str::to_string),
..Default::default()
},
Some(props) =>
{
let mut props = props.clone();
props.content_type = Some("application/json".to_string());
props
}
};
let payload_raw = serde_json::to_string(&payload)?;
Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
}
#[cfg(feature = "cbor")]
pub fn publish_cbor<T: serde::Serialize>(
self,
topic: impl Into<String>,
qos: QoS,
retain: bool,
payload: T,
properties: Option<rumqttc::v5::mqttbytes::v5::PublishProperties>,
) -> Result<impl std::future::Future<Output = Result<()>>>
{
let properties = match properties
{
None => rumqttc::v5::mqttbytes::v5::PublishProperties {
content_type: Some("application/cbor").map(str::to_string),
..Default::default()
},
Some(props) =>
{
let mut props = props.clone();
props.content_type = Some("application/cbor".to_string());
props
}
};
let mut payload_raw = Vec::<u8>::new();
ciborium::into_writer(&payload, &mut payload_raw)?;
Ok(self.publish_raw(topic, qos, retain, payload_raw, Some(properties)))
}
pub async fn create_raw_subscription(
mut self,
topic: impl Into<String>,
qos: QoS,
) -> Result<async_broadcast::Receiver<RawMessage>>
{
let mut subs = self.subscriptions.lock().await;
let topic = topic.into();
match subs.get(&topic)
{
Some(_) => Err(Error::AlreadySubscribed(topic)),
None => _create_raw_subscription!(self.request_sender, subs, topic, qos),
}
}
pub fn get_raw_subscription(
&self,
topic: impl Into<String>,
) -> Result<async_broadcast::Receiver<RawMessage>>
{
let topic = topic.into();
futures::executor::block_on(self.subscriptions.lock())
.get(&topic)
.ok_or_else(|| Error::NotSubscribed(topic))
.map(|x| x.activate_cloned())
}
pub async fn get_or_create_raw_subscription(
mut self,
topic: impl Into<String>,
qos: QoS,
) -> Result<async_broadcast::Receiver<RawMessage>>
{
let mut subs = self.subscriptions.lock().await;
let topic = topic.into();
match subs.get(&topic)
{
Some(s) => Ok(s.activate_cloned()),
None => _create_raw_subscription!(self.request_sender, subs, topic, qos),
}
}
#[cfg(feature = "json")]
pub async fn get_or_create_json_subscription<T>(
self,
topic: impl Into<String>,
qos: QoS,
) -> Result<impl futures::Stream<Item = Message<T>>>
where
for<'de> T: serde::Deserialize<'de>,
{
use bytes::Buf;
let raw_s = self.get_or_create_raw_subscription(topic, qos).await?;
let map_s = raw_s.filter_map(|msg| async move {
let payload_r = serde_json::from_slice::<T>(msg.payload.chunk());
match payload_r
{
Ok(payload) => Some(Message::<T> {
payload,
properties: msg.properties,
}),
Err(e) =>
{
log::error!("Failed to parse JSON message: {e}");
None
}
}
});
Ok(map_s)
}
}
#[cfg(test)]
mod tests
{
#[test]
fn it_works()
{
assert_eq!(4, 4);
}
}