use crate::error::{ClientError, ConnectError};
use crate::MqttOptions;
use crossbeam_channel;
use futures::{sync::mpsc, Future, Sink};
use mqtt311::{PacketIdentifier, Publish, QoS, Subscribe, Unsubscribe, SubscribeTopic};
use std::sync::Arc;
#[doc(hidden)]
pub mod connection;
#[doc(hidden)]
pub mod mqttstate;
#[doc(hidden)]
pub mod network;
#[doc(hidden)]
pub mod prepend;
#[derive(Debug)]
pub enum Notification {
Reconnection,
Disconnection,
Publish(Publish),
PubAck(PacketIdentifier),
PubRec(PacketIdentifier),
PubRel(PacketIdentifier),
PubComp(PacketIdentifier),
SubAck(PacketIdentifier),
None,
}
#[doc(hidden)]
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum Request {
Publish(Publish),
Subscribe(Subscribe),
Unsubscribe(Unsubscribe),
PubAck(PacketIdentifier),
PubRec(PacketIdentifier),
PubRel(PacketIdentifier),
PubComp(PacketIdentifier),
IncomingIdlePing,
OutgoingIdlePing,
Reconnect(MqttOptions),
Disconnect,
None,
}
#[doc(hidden)]
#[derive(Debug)]
pub enum Command {
Pause,
Resume,
}
#[doc(hidden)]
pub struct UserHandle {
request_tx: mpsc::Sender<Request>,
command_tx: mpsc::Sender<Command>,
notification_rx: crossbeam_channel::Receiver<Notification>,
}
#[derive(Clone)]
pub struct MqttClient {
request_tx: mpsc::Sender<Request>,
command_tx: mpsc::Sender<Command>,
max_packet_size: usize,
}
impl MqttClient {
pub fn start(opts: MqttOptions) -> Result<(Self, crossbeam_channel::Receiver<Notification>), ConnectError> {
let max_packet_size = opts.max_packet_size();
let UserHandle {
request_tx,
command_tx,
notification_rx,
} = connection::Connection::run(opts)?;
let client = MqttClient {
request_tx,
command_tx,
max_packet_size,
};
Ok((client, notification_rx))
}
pub fn publish<S, V, B>(&mut self, topic: S, qos: QoS, retained: B, payload: V) -> Result<(), ClientError>
where
S: Into<String>,
V: Into<Vec<u8>>,
B: Into<bool>,
{
let payload = payload.into();
if payload.len() > self.max_packet_size {
return Err(ClientError::PacketSizeLimitExceeded);
}
let publish = Publish {
dup: false,
qos,
retain: retained.into(),
topic_name: topic.into(),
pkid: None,
payload: Arc::new(payload),
};
let tx = &mut self.request_tx;
tx.send(Request::Publish(publish)).wait()?;
Ok(())
}
pub fn subscribe<S>(&mut self, topic: S, qos: QoS) -> Result<(), ClientError>
where
S: Into<String>,
{
let topic = SubscribeTopic {
topic_path: topic.into(),
qos,
};
let subscribe = Subscribe {
pkid: PacketIdentifier::zero(),
topics: vec![topic],
};
let tx = &mut self.request_tx;
tx.send(Request::Subscribe(subscribe)).wait()?;
Ok(())
}
pub fn unsubscribe<S>(&mut self, topic: S) -> Result<(), ClientError>
where
S: Into<String>,
{
let unsubscribe = Unsubscribe {
pkid: PacketIdentifier::zero(),
topics: vec![topic.into()],
};
let tx = &mut self.request_tx;
tx.send(Request::Unsubscribe(unsubscribe)).wait()?;
Ok(())
}
pub fn pause(&mut self) -> Result<(), ClientError> {
let tx = &mut self.command_tx;
tx.send(Command::Pause).wait()?;
Ok(())
}
pub fn resume(&mut self) -> Result<(), ClientError> {
let tx = &mut self.command_tx;
tx.send(Command::Resume).wait()?;
Ok(())
}
pub fn shutdown(&mut self) -> Result<(), ClientError> {
let tx = &mut self.request_tx;
tx.send(Request::Disconnect).wait()?;
Ok(())
}
}