use crate::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use async_channel::{SendError, Sender, TrySendError};
use bytes::Bytes;
use mqttbytes::v4::*;
use mqttbytes::*;
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("Failed to send cancel request to eventloop")]
Cancel(#[from] SendError<()>),
#[error("Failed to send mqtt requests to eventloop")]
Request(#[from] SendError<Request>),
#[error("Failed to send mqtt requests to eventloop")]
TryRequest(#[from] TrySendError<Request>),
#[error("Serialization error")]
Mqtt4(mqttbytes::Error),
}
#[derive(Clone, Debug)]
pub struct AsyncClient {
request_tx: Sender<Request>,
cancel_tx: Sender<()>,
}
impl AsyncClient {
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let mut eventloop = EventLoop::new(options, cap);
let request_tx = eventloop.handle();
let cancel_tx = eventloop.cancel_handle();
let client = AsyncClient { request_tx, cancel_tx };
(client, eventloop)
}
pub fn from_senders(request_tx: Sender<Request>, cancel_tx: Sender<()>) -> AsyncClient {
AsyncClient { request_tx, cancel_tx }
}
pub async fn publish<S, V>(&self, topic: S, qos: QoS, retain: bool, payload: V) -> Result<(), ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
{
let mut publish = Publish::new(topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
self.request_tx.send(publish).await?;
Ok(())
}
pub fn try_publish<S, V>(&self, topic: S, qos: QoS, retain: bool, payload: V) -> Result<(), ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
{
let mut publish = Publish::new(topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
self.request_tx.try_send(publish)?;
Ok(())
}
pub async fn publish_bytes<S>(&self, topic: S, qos: QoS, retain: bool, payload: Bytes) -> Result<(), ClientError>
where
S: Into<String>,
{
let mut publish = Publish::from_bytes(topic, qos, payload);
publish.retain = retain;
let publish = Request::Publish(publish);
self.request_tx.send(publish).await?;
Ok(())
}
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let request = Request::Subscribe(subscribe);
self.request_tx.send(request).await?;
Ok(())
}
pub async fn subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
let subscribe = Subscribe::new_many(topics);
let request = Request::Subscribe(subscribe);
self.request_tx.send(request).await?;
Ok(())
}
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.send(request).await?;
Ok(())
}
pub async fn disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.request_tx.send(request).await?;
Ok(())
}
pub async fn cancel(&self) -> Result<(), ClientError> {
self.cancel_tx.send(()).await?;
Ok(())
}
}
#[derive(Clone)]
pub struct Client {
client: AsyncClient,
}
impl Client {
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };
let connection = Connection::new(eventloop);
(client, connection)
}
pub fn publish<S, V>(&mut self, topic: S, qos: QoS, retain: bool, payload: V) -> Result<(), ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
{
pollster::block_on(self.client.publish(topic, qos, retain, payload))?;
Ok(())
}
pub fn subscribe<S: Into<String>>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> {
pollster::block_on(self.client.subscribe(topic, qos))?;
Ok(())
}
pub fn subscribe_many<T>(&mut self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = SubscribeFilter>,
{
pollster::block_on(self.client.subscribe_many(topics))
}
pub fn unsubscribe<S: Into<String>>(&mut self, topic: S) -> Result<(), ClientError> {
pollster::block_on(self.client.unsubscribe(topic))?;
Ok(())
}
pub fn disconnect(&mut self) -> Result<(), ClientError> {
pollster::block_on(self.client.disconnect())?;
Ok(())
}
pub fn cancel(&mut self) -> Result<(), ClientError> {
pollster::block_on(self.client.cancel())?;
Ok(())
}
}
pub struct Connection {
pub eventloop: EventLoop,
}
impl Connection {
fn new(eventloop: EventLoop) -> Connection {
Connection { eventloop }
}
#[must_use = "Connection should be iterated over a loop to make progress"]
pub fn iter(&mut self) -> Iter {
Iter { connection: self }
}
}
pub struct Iter<'a> {
connection: &'a mut Connection,
}
impl<'a> Iterator for Iter<'a> {
type Item = Result<Event, ConnectionError>;
fn next(&mut self) -> Option<Self::Item> {
let f = self.connection.eventloop.poll();
match async_std::task::block_on(f) {
Ok(v) => Some(Ok(v)),
Err(ConnectionError::RequestsDone) => {
trace!("Done with requests");
None
}
Err(ConnectionError::Cancel) => {
trace!("Cancellation request received");
None
}
Err(e) => Some(Err(e)),
}
}
}