#![doc(html_logo_url = "https://raw.githubusercontent.com/montoyo/lmc-rs/master/logo.svg", html_favicon_url = "https://raw.githubusercontent.com/montoyo/lmc-rs/master/favicon.ico")]
#![cfg_attr(feature = "tls", doc = r##"```
# tokio_test::block_on(async {
use lmc::{Options, Client, QoS};
let mut opts = Options::new("client_id")
.enable_tls()
.expect("Failed to load native system TLS certificates");
opts.set_username("username")
.set_password(b"password");
# return; //We can't really test this in doctests
let (client, shutdown_handle) = Client::connect("localhost", opts)
.await
.expect("Failed to connect to broker!");
let (subscription, sub_qos) = client.subscribe_unbounded("my_topic", QoS::AtLeastOnce)
.await
.expect("Failed to subscribe to 'my_topic'");
println!("Subscribed to topic with QoS {:?}", sub_qos);
client.publish_qos_1("my_topic", b"it works!", false, true)
.await
.expect("Failed to publish message in 'my_topic'");
let msg = subscription.recv().await.expect("Failed to await message");
println!("Received {}", msg.payload_as_utf8().unwrap());
shutdown_handle.disconnect().await.expect("Could not disconnect gracefully");
# });
```"##)]
#![feature(new_uninit)]
#![feature(get_mut_unchecked)]
#![feature(const_trait_impl)]
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::net::{lookup_host, TcpSocket};
use tokio::sync::{mpsc, oneshot};
use tokio::task::{JoinHandle, JoinError};
use tokio::time;
#[cfg(feature = "tls")]
pub mod tls;
pub mod subs;
pub mod options;
mod transport;
mod transceiver;
mod futures;
mod errors;
mod shared;
mod wrappers;
#[cfg(test)]
mod tests;
pub use errors::{ConnectError, PublishError, TryPublishError, SubscribeError, TimeoutKind, ServerConnectError};
pub use options::{Options, LastWill};
pub use transceiver::ShutdownStatus;
pub use transceiver::packets::{IncomingPublishPacket as Message, PublishFlags, PublishPacketInfo as MessageInfo};
use options::{ConnectionConfig, OptionsT};
use transceiver::{TransceiverBuildData, Transceiver};
use transceiver::commands::{Command, PublishCommand, SubscribeCommand, UnsubCommand, UnsubKind, SubscriptionKind, FastCallback};
use transceiver::packets::{ConnectPacket, Encode, OutgoingPublishPacket};
use futures::*;
use shared::*;
#[cfg(feature = "tls")]
pub use tls::OptionsWithTls;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum QoS
{
AtMostOnce = 0,
AtLeastOnce = 1,
ExactlyOnce = 2
}
#[derive(Clone)]
pub struct Client
{
was_session_present: bool,
cmd_queue: mpsc::Sender<Command>,
shared: Arc<ClientShared>
}
pub struct ClientShutdownHandle
{
cmd_queue: mpsc::Sender<Command>,
join_handle: JoinHandle<ShutdownStatus>
}
#[derive(Debug, Clone, Copy)]
pub enum PublishEvent
{
None,
Received,
Complete
}
enum PublishEventFuture
{
None,
Received(PublishFuture<RecNotifierMapAccessor>),
Complete(PublishFuture<CompNotifierMapAccessor>)
}
enum SubWait
{
DontWait(QoS),
Before,
After
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum SubscriptionStatus
{
Absent,
Pending,
Live
}
impl Client
{
pub async fn connect<'a, C, CC>(host: &str, options: OptionsT<'a, CC>) -> Result<(Client, ClientShutdownHandle), ConnectError>
where CC: ConnectionConfig<C>
{
let (options, conn_cfg) = options.separate_connection_cfg();
let tmp;
let (host_and_port, port_pos) = match host.find(':') {
Some(x) => (host, x),
None => {
tmp = format!("{}:{}", host, options.default_port());
(tmp.as_str(), host.len())
}
};
let host_only = &host_and_port[..port_pos];
let conn = conn_cfg.create_connection(host_only)?;
let addr = time::timeout(options.dns_timeout, lookup_host(host_and_port)).await
.map_err(|_| ConnectError::Timeout(TimeoutKind::DnsLookup))?
.map_err(ConnectError::LookupHostError)?
.filter(|addr| options.enabled_ip_versions.supports(addr))
.next()
.ok_or(ConnectError::HostnameNotFound)?;
log::debug!("Connecting to {:?}", addr);
let socket = (if addr.is_ipv6() { TcpSocket::new_v6() } else { TcpSocket::new_v4() }).map_err(ConnectError::IoError)?;
let stream = time::timeout(options.tcp_connect_timeout, socket.connect(addr)).await
.map_err(|_| ConnectError::Timeout(TimeoutKind::TcpConnect))?
.map_err(ConnectError::IoError)?;
if options.no_delay {
stream.set_nodelay(true).map_err(ConnectError::IoError)?;
}
let (cmd_tx, cmd_rx) = mpsc::channel(1024);
let (conn_tx, conn_rx) = oneshot::channel();
let transport = CC::create_transport(stream, conn);
let connect_packet = ConnectPacket {
clean_session: options.clean_session,
keep_alive: options.keep_alive,
client_id: options.client_id,
will: options.last_will,
username: options.username,
password: options.password
}.make_arc_packet();
let shared = Arc::new(ClientShared::new());
let build_data = TransceiverBuildData {
transport,
cmd_queue: cmd_rx,
connect_sig: conn_tx,
connect_packet,
ping_interval: Duration::from_secs(options.keep_alive as u64),
shared: shared.clone(),
packet_resend_delay: options.packets_resend_delay.max(Duration::from_secs(1))
};
let join_handle = Transceiver::spawn(build_data);
let conn_result = time::timeout(options.mqtt_connect_timeout, conn_rx).await;
let conn_result = match conn_result {
Ok(Ok(Ok(x))) => Ok(x),
Ok(Ok(Err(x))) => Err(ConnectError::ServerError(x)),
Ok(Err(_)) => Err(ConnectError::OneshotRecvError),
Err(_) => {
let _ = cmd_tx.send(Command::Disconnect).await;
return Err(ConnectError::Timeout(TimeoutKind::MqttConnect));
}
};
match conn_result {
Ok(was_session_present) => {
let shutdown_handle = ClientShutdownHandle {
cmd_queue: cmd_tx.clone(),
join_handle
};
let client = Client {
was_session_present,
cmd_queue: cmd_tx,
shared
};
Ok((client, shutdown_handle))
},
Err(err) => {
drop(cmd_tx);
let _ = join_handle.await;
Err(err)
}
}
}
pub fn was_session_present(&self) -> bool
{
self.was_session_present
}
#[inline]
fn make_publish_cmd(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> PublishCommand
{
let packet_id = if qos == QoS::AtMostOnce { 0 } else { self.shared.next_packet_id.fetch_add(1, Ordering::Relaxed) };
let packet = OutgoingPublishPacket {
flags: PublishFlags::new(false, qos, retain),
topic, packet_id, payload
};
PublishCommand {
packet_id,
qos,
packet: packet.make_arc_packet()
}
}
pub fn try_publish(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), TryPublishError>
{
use mpsc::error::*;
let cmd = self.make_publish_cmd(topic, payload, qos, retain);
match self.cmd_queue.try_send(cmd.into()) {
Ok(x) => Ok(x),
Err(TrySendError::Closed(_)) => Err(TryPublishError::TransceiverTaskTerminated),
Err(TrySendError::Full(_)) => Err(TryPublishError::QueueFull)
}
}
pub async fn publish_no_wait(&self, topic: &str, payload: &[u8], qos: QoS, retain: bool) -> Result<(), PublishError>
{
let cmd = self.make_publish_cmd(topic, payload, qos, retain);
self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)
}
pub async fn publish_qos_0(&self, topic: &str, payload: &[u8], retain: bool) -> Result<(), PublishError>
{
let cmd = self.make_publish_cmd(topic, payload, QoS::AtMostOnce, retain);
self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)
}
pub async fn publish_qos_1(&self, topic: &str, payload: &[u8], retain: bool, await_ack: bool) -> Result<(), PublishError>
{
let cmd = self.make_publish_cmd(topic, payload, QoS::AtLeastOnce, retain);
let opt_fut = match await_ack {
true => Some(PublishFuture::new(cmd.packet_id, AckNotifierMapAccessor(self.shared.clone()))),
false => None
};
self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)?;
match opt_fut {
Some(x) => if x.await { Err(PublishError::TransceiverTaskTerminated) } else { Ok(()) },
None => Ok(())
}
}
pub async fn publish_qos_2(&self, topic: &str, payload: &[u8], retain: bool, await_event: PublishEvent) -> Result<(), PublishError>
{
let cmd = self.make_publish_cmd(topic, payload, QoS::ExactlyOnce, retain);
let opt_fut = match await_event {
PublishEvent::None => PublishEventFuture::None,
PublishEvent::Received => PublishEventFuture::Received(PublishFuture::new(cmd.packet_id, RecNotifierMapAccessor(self.shared.clone()))),
PublishEvent::Complete => PublishEventFuture::Complete(PublishFuture::new(cmd.packet_id, CompNotifierMapAccessor(self.shared.clone())))
};
self.cmd_queue.send(cmd.into()).await.map_err(|_| PublishError::TransceiverTaskTerminated)?;
let ttt = match opt_fut {
PublishEventFuture::None => false,
PublishEventFuture::Received(x) => x.await,
PublishEventFuture::Complete(x) => x.await
};
if ttt { Err(PublishError::TransceiverTaskTerminated) } else { Ok(()) }
}
async fn subscribe<R, F>(&self, topic: &str, qos_hint: QoS, func: F) -> Result<(R, QoS), SubscribeError>
where F: FnOnce() -> (R, SubscriptionKind)
{
let mut sub_map = self.shared.subs.lock();
let actual_qos = match sub_map.get_mut(topic) {
Some(SubscriptionState::Existing(qos)) => SubWait::DontWait(*qos), Some(SubscriptionState::Pending(_)) => SubWait::Before, None => {
sub_map.insert(topic.into(), SubscriptionState::Pending(Vec::new()));
SubWait::After
}
};
drop(sub_map);
let actual_qos = match actual_qos {
SubWait::DontWait(x) => Some(x),
SubWait::Before => Some(SubscribeFuture::new(&self.shared, topic).await.map_err(|_| SubscribeError::RefusedByBroker)?),
SubWait::After => None
};
let (ret, kind) = func();
let cmd = SubscribeCommand {
topic: topic.into(),
qos: qos_hint,
kind
};
self.cmd_queue.send(cmd.into()).await.map_err(|_| SubscribeError::TransceiverTaskTerminated)?;
let actual_qos = match actual_qos {
Some(x) => x,
None => SubscribeFuture::new(&self.shared, topic).await.map_err(|_| SubscribeError::RefusedByBroker)?
};
Ok((ret, actual_qos))
}
pub async fn subscribe_void(&self, topic: String, qos_hint: QoS) -> Result<QoS, SubscribeError>
{
self.subscribe(&topic, qos_hint, move || ((), SubscriptionKind::Void)).await.map(|(_, qos)| qos)
}
pub fn subscribe_lossy<'a>(&'a self, topic: &'a str, qos_hint: QoS, queue_cap: usize) -> impl Future<Output = Result<(mpsc::Receiver<Message>, QoS), SubscribeError>> + 'a
{
self.subscribe(topic, qos_hint, move || {
let (tx, rx) = mpsc::channel(queue_cap);
(rx, SubscriptionKind::Lossy(tx))
})
}
pub fn subscribe_unbounded<'a>(&'a self, topic: &'a str, qos_hint: QoS) -> impl Future<Output = Result<(mpsc::UnboundedReceiver<Message>, QoS), SubscribeError>> + 'a
{
self.subscribe(topic, qos_hint, move || {
let (tx, rx) = mpsc::unbounded_channel();
(rx, SubscriptionKind::Unbounded(tx))
})
}
pub async fn subscribe_fast_callback<C>(&self, topic: String, qos_hint: QoS, callback: C) -> Result<(subs::Callback, QoS), SubscribeError>
where C: FnMut(Message) + Send + Sync + 'static
{
let (id, qos) = self.subscribe(&topic, qos_hint, move || {
let id = self.shared.next_callback_id.fetch_add(1, Ordering::Relaxed);
(id, SubscriptionKind::FastCallback(FastCallback { id, f: Box::new(callback) }))
}).await?;
let ret = subs::Callback::new(self.cmd_queue.clone(), topic, id);
Ok((ret, qos))
}
pub fn get_subscription_status(&self, topic: &str) -> SubscriptionStatus
{
match self.shared.subs.lock().get(topic) {
Some(SubscriptionState::Existing(_)) => SubscriptionStatus::Live,
Some(SubscriptionState::Pending(_)) => SubscriptionStatus::Pending,
None => SubscriptionStatus::Absent
}
}
pub async fn unsubscribe(&self, topic: String)
{
let cmd = UnsubCommand { topic, kind: UnsubKind::Immediate };
let _ = self.cmd_queue.send(cmd.into()).await;
}
pub fn is_transceiver_task_running(&self) -> bool
{
!self.cmd_queue.is_closed()
}
pub fn did_transceiver_task_stop(&self) -> bool
{
self.cmd_queue.is_closed()
}
pub async fn disconnect(self)
{
let _ = self.cmd_queue.send(Command::Disconnect).await;
}
}
impl ClientShutdownHandle
{
pub async fn disconnect(self) -> Result<ShutdownStatus, JoinError>
{
let _ = self.cmd_queue.send(Command::Disconnect).await;
self.join_handle.await
}
}