use std::time::Duration;
use super::mqttbytes::v5::{
Auth, AuthProperties, AuthReasonCode, Filter, PubAck, PubRec, Publish, PublishProperties,
Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties,
};
use super::mqttbytes::QoS;
use super::{
ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop,
MqttOptions, Request,
};
use crate::{valid_filter, valid_topic};
use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
use futures_util::FutureExt;
use tokio::runtime::{self, Runtime};
use tokio::time::timeout;
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
#[error("Invalid MQTT topic: '{0}'")]
pub struct InvalidTopic(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ValidatedTopic(String);
impl ValidatedTopic {
pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
let topic_string = topic.into();
if valid_topic(&topic_string) {
Ok(Self(topic_string))
} else {
Err(InvalidTopic(topic_string))
}
}
}
impl From<ValidatedTopic> for String {
fn from(topic: ValidatedTopic) -> Self {
topic.0
}
}
mod private {
use super::ValidatedTopic;
pub trait Sealed {}
impl Sealed for ValidatedTopic {}
impl Sealed for String {}
impl Sealed for &str {}
}
pub trait Topic: private::Sealed {
const NEEDS_VALIDATION: bool;
fn into_string(self) -> String;
}
impl Topic for ValidatedTopic {
const NEEDS_VALIDATION: bool = false;
fn into_string(self) -> String {
self.0
}
}
impl Topic for String {
const NEEDS_VALIDATION: bool = true;
fn into_string(self) -> String {
self
}
}
impl Topic for &str {
const NEEDS_VALIDATION: bool = true;
fn into_string(self) -> String {
self.to_owned()
}
}
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("Failed to send mqtt requests to eventloop")]
Request(Box<Request>),
#[error("Failed to send mqtt requests to eventloop")]
TryRequest(Box<Request>),
}
impl From<SendError<Request>> for ClientError {
fn from(e: SendError<Request>) -> Self {
Self::Request(Box::new(e.into_inner()))
}
}
impl From<TrySendError<Request>> for ClientError {
fn from(e: TrySendError<Request>) -> Self {
Self::TryRequest(Box::new(e.into_inner()))
}
}
#[derive(Clone, Debug)]
pub struct AsyncClient {
request_tx: Sender<Request>,
}
impl AsyncClient {
pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
let (eventloop, request_tx) = EventLoop::new_for_async_client(options, cap);
let client = AsyncClient { request_tx };
(client, eventloop)
}
#[must_use]
pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
AsyncClient { request_tx }
}
async fn handle_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
let topic = topic.into_string();
let invalid_topic = S::NEEDS_VALIDATION && !valid_topic(&topic);
let mut publish = Publish::new(topic, qos, payload, properties);
publish.retain = retain;
let publish = Request::Publish(publish);
if invalid_topic {
return Err(ClientError::Request(Box::new(publish)));
}
self.request_tx.send_async(publish).await?;
Ok(())
}
pub async fn publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, Some(properties))
.await
}
pub async fn publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, None).await
}
fn handle_try_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
let topic = topic.into_string();
let invalid_topic = S::NEEDS_VALIDATION && !valid_topic(&topic);
let mut publish = Publish::new(topic, qos, payload, properties);
publish.retain = retain;
let publish = Request::Publish(publish);
if invalid_topic {
return Err(ClientError::TryRequest(Box::new(publish)));
}
self.request_tx.try_send(publish)?;
Ok(())
}
pub fn try_publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.handle_try_publish(topic, qos, retain, payload, Some(properties))
}
pub fn try_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.handle_try_publish(topic, qos, retain, payload, None)
}
pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
if let Some(ack) = ack {
self.request_tx.send_async(ack).await?;
}
Ok(())
}
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
if let Some(ack) = ack {
self.request_tx.try_send(ack)?;
}
Ok(())
}
pub async fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
let auth = Request::Auth(auth);
self.request_tx.send_async(auth).await?;
Ok(())
}
pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
let auth = Request::Auth(auth);
self.request_tx.try_send(auth)?;
Ok(())
}
async fn handle_publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Topic,
{
let topic = topic.into_string();
let invalid_topic = S::NEEDS_VALIDATION && !valid_topic(&topic);
let mut publish = Publish::new(topic, qos, payload, properties);
publish.retain = retain;
let publish = Request::Publish(publish);
if invalid_topic {
return Err(ClientError::Request(Box::new(publish)));
}
self.request_tx.send_async(publish).await?;
Ok(())
}
pub async fn publish_bytes_with_properties<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Topic,
{
self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
.await
}
pub async fn publish_bytes<S>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: Bytes,
) -> Result<(), ClientError>
where
S: Topic,
{
self.handle_publish_bytes(topic, qos, retain, payload, None)
.await
}
async fn handle_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let subscribe = Subscribe::new(filter, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(Box::new(subscribe.into())));
}
self.request_tx.send_async(subscribe.into()).await?;
Ok(())
}
pub async fn subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<(), ClientError> {
self.handle_subscribe(topic, qos, Some(properties)).await
}
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
self.handle_subscribe(topic, qos, None).await
}
fn handle_try_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let subscribe = Subscribe::new(filter, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::TryRequest(Box::new(subscribe.into())));
}
self.request_tx.try_send(subscribe.into())?;
Ok(())
}
pub fn try_subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<(), ClientError> {
self.handle_try_subscribe(topic, qos, Some(properties))
}
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
self.handle_try_subscribe(topic, qos, None)
}
async fn handle_subscribe_many<T>(
&self,
topics: T,
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
let subscribe = Subscribe::new_many(topics, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(Box::new(subscribe.into())));
}
self.request_tx.send_async(subscribe.into()).await?;
Ok(())
}
pub async fn subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, Some(properties)).await
}
pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, None).await
}
fn handle_try_subscribe_many<T>(
&self,
topics: T,
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
let subscribe = Subscribe::new_many(topics, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::TryRequest(Box::new(subscribe.into())));
}
self.request_tx.try_send(subscribe.into())?;
Ok(())
}
pub fn try_subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_try_subscribe_many(topics, Some(properties))
}
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_try_subscribe_many(topics, None)
}
async fn handle_unsubscribe<S: Into<String>>(
&self,
topic: S,
properties: Option<UnsubscribeProperties>,
) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic, properties);
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.send_async(request).await?;
Ok(())
}
pub async fn unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<(), ClientError> {
self.handle_unsubscribe(topic, Some(properties)).await
}
pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
self.handle_unsubscribe(topic, None).await
}
fn handle_try_unsubscribe<S: Into<String>>(
&self,
topic: S,
properties: Option<UnsubscribeProperties>,
) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic, properties);
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.try_send(request)?;
Ok(())
}
pub fn try_unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<(), ClientError> {
self.handle_try_unsubscribe(topic, Some(properties))
}
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
self.handle_try_unsubscribe(topic, None)
}
pub async fn disconnect(&self) -> Result<(), ClientError> {
self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
.await
}
pub async fn disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.handle_disconnect(reason, Some(properties)).await
}
async fn handle_disconnect(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Result<(), ClientError> {
let request = self.build_disconnect_request(reason, properties);
self.request_tx.send_async(request).await?;
Ok(())
}
pub fn try_disconnect(&self) -> Result<(), ClientError> {
self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None)
}
pub fn try_disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.handle_try_disconnect(reason, Some(properties))
}
fn handle_try_disconnect(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Result<(), ClientError> {
let request = self.build_disconnect_request(reason, properties);
self.request_tx.try_send(request)?;
Ok(())
}
fn build_disconnect_request(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Request {
match properties {
Some(p) => Request::Disconnect(Disconnect::new_with_properties(reason, p)),
None => Request::Disconnect(Disconnect::new(reason)),
}
}
}
fn get_ack_req(publish: &Publish) -> Option<Request> {
let ack = match publish.qos {
QoS::AtMostOnce => return None,
QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None)),
QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid, None)),
};
Some(ack)
}
#[derive(Clone)]
pub struct Client {
client: AsyncClient,
}
impl Client {
pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
let (client, eventloop) = AsyncClient::new(options, cap);
let client = Client { client };
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let connection = Connection::new(eventloop, runtime);
(client, connection)
}
#[must_use]
pub fn from_sender(request_tx: Sender<Request>) -> Client {
Client {
client: AsyncClient::from_senders(request_tx),
}
}
fn handle_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
let topic = topic.into_string();
let invalid_topic = S::NEEDS_VALIDATION && !valid_topic(&topic);
let mut publish = Publish::new(topic, qos, payload, properties);
publish.retain = retain;
let request = Request::Publish(publish);
if invalid_topic {
return Err(ClientError::Request(Box::new(request)));
}
self.client.request_tx.send(request)?;
Ok(())
}
pub fn publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, Some(properties))
}
pub fn publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, None)
}
pub fn try_publish_with_properties<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.client
.try_publish_with_properties(topic, qos, retain, payload, properties)
}
pub fn try_publish<S, P>(
&self,
topic: S,
qos: QoS,
retain: bool,
payload: P,
) -> Result<(), ClientError>
where
S: Topic,
P: Into<Bytes>,
{
self.client.try_publish(topic, qos, retain, payload)
}
pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
let ack = get_ack_req(publish);
if let Some(ack) = ack {
self.client.request_tx.send(ack)?;
}
Ok(())
}
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
self.client.try_ack(publish)?;
Ok(())
}
pub fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
let auth = Request::Auth(auth);
self.client.request_tx.send(auth)?;
Ok(())
}
pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
let auth = Request::Auth(auth);
self.client.request_tx.try_send(auth)?;
Ok(())
}
fn handle_subscribe<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let subscribe = Subscribe::new(filter, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(Box::new(subscribe.into())));
}
self.client.request_tx.send(subscribe.into())?;
Ok(())
}
pub fn subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<(), ClientError> {
self.handle_subscribe(topic, qos, Some(properties))
}
pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
self.handle_subscribe(topic, qos, None)
}
pub fn try_subscribe_with_properties<S: Into<String>>(
&self,
topic: S,
qos: QoS,
properties: SubscribeProperties,
) -> Result<(), ClientError> {
self.client
.try_subscribe_with_properties(topic, qos, properties)
}
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
self.client.try_subscribe(topic, qos)
}
fn handle_subscribe_many<T>(
&self,
topics: T,
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
let subscribe = Subscribe::new_many(topics, properties);
if !subscribe_has_valid_filters(&subscribe) {
return Err(ClientError::Request(Box::new(subscribe.into())));
}
self.client.request_tx.send(subscribe.into())?;
Ok(())
}
pub fn subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, Some(properties))
}
pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.handle_subscribe_many(topics, None)
}
pub fn try_subscribe_many_with_properties<T>(
&self,
topics: T,
properties: SubscribeProperties,
) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.client
.try_subscribe_many_with_properties(topics, properties)
}
pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
where
T: IntoIterator<Item = Filter>,
{
self.client.try_subscribe_many(topics)
}
fn handle_unsubscribe<S: Into<String>>(
&self,
topic: S,
properties: Option<UnsubscribeProperties>,
) -> Result<(), ClientError> {
let unsubscribe = Unsubscribe::new(topic, properties);
let request = Request::Unsubscribe(unsubscribe);
self.client.request_tx.send(request)?;
Ok(())
}
pub fn unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<(), ClientError> {
self.handle_unsubscribe(topic, Some(properties))
}
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
self.handle_unsubscribe(topic, None)
}
pub fn try_unsubscribe_with_properties<S: Into<String>>(
&self,
topic: S,
properties: UnsubscribeProperties,
) -> Result<(), ClientError> {
self.client
.try_unsubscribe_with_properties(topic, properties)
}
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
self.client.try_unsubscribe(topic)
}
pub fn disconnect(&self) -> Result<(), ClientError> {
self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
}
pub fn disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.handle_disconnect(reason, Some(properties))
}
fn handle_disconnect(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Result<(), ClientError> {
let request = self.client.build_disconnect_request(reason, properties);
self.client.request_tx.send(request)?;
Ok(())
}
pub fn try_disconnect(&self) -> Result<(), ClientError> {
self.client.try_disconnect()
}
pub fn try_disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.client.handle_try_disconnect(reason, Some(properties))
}
}
#[must_use]
fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
!subscribe.filters.is_empty()
&& subscribe
.filters
.iter()
.all(|filter| valid_filter(&filter.path))
}
#[derive(Debug, Eq, PartialEq)]
pub struct RecvError;
#[derive(Debug, Eq, PartialEq)]
pub enum TryRecvError {
Disconnected,
Empty,
}
#[derive(Debug, Eq, PartialEq)]
pub enum RecvTimeoutError {
Disconnected,
Timeout,
}
pub struct Connection {
pub eventloop: EventLoop,
runtime: Runtime,
}
impl Connection {
fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
Connection { eventloop, runtime }
}
#[must_use = "Connection should be iterated over a loop to make progress"]
pub fn iter(&mut self) -> Iter<'_> {
Iter { connection: self }
}
pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
let f = self.eventloop.poll();
let event = self.runtime.block_on(f);
resolve_event(event).ok_or(RecvError)
}
pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
let f = self.eventloop.poll();
let _guard = self.runtime.enter();
let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
resolve_event(event).ok_or(TryRecvError::Disconnected)
}
pub fn recv_timeout(
&mut self,
duration: Duration,
) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
let f = self.eventloop.poll();
let event = self
.runtime
.block_on(async { timeout(duration, f).await })
.map_err(|_| RecvTimeoutError::Timeout)?;
resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
}
}
fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
match event {
Ok(v) => Some(Ok(v)),
Err(ConnectionError::RequestsDone) => {
trace!("Done with requests");
None
}
Err(e) => Some(Err(e)),
}
}
pub struct Iter<'a> {
connection: &'a mut Connection,
}
impl Iterator for Iter<'_> {
type Item = Result<Event, ConnectionError>;
fn next(&mut self) -> Option<Self::Item> {
self.connection.recv().ok()
}
}
#[cfg(test)]
mod test {
use crate::v5::mqttbytes::v5::LastWill;
use super::*;
#[test]
fn calling_iter_twice_on_connection_shouldnt_panic() {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
mqttoptions.set_keep_alive(5).set_last_will(will);
let (_, mut connection) = Client::new(mqttoptions, 10);
let _ = connection.iter();
let _ = connection.iter();
}
#[test]
fn should_be_able_to_build_test_client_from_channel() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
client
.publish("hello/world", QoS::ExactlyOnce, false, "good bye")
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn test_reauth() {
let (client, mut connection) =
Client::new(MqttOptions::new("test-1", "localhost", 1883), 10);
let props = AuthProperties {
method: Some("test".to_string()),
data: Some(Bytes::from("test")),
reason: None,
user_properties: vec![],
};
let _ = client
.reauth(Some(props.clone()))
.expect("Should be able to reauth");
let _ = connection.iter().next().expect("Should have event");
let _ = client
.try_reauth(Some(props.clone()))
.expect("Should be able to reauth");
let _ = connection.iter().next().expect("Should have event");
}
#[test]
fn can_publish_with_validated_topic() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
client
.publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn validated_topic_ergonomics() {
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
let valid_topic_can_be_cloned = valid_topic.clone();
assert_eq!(valid_topic, valid_topic_can_be_cloned);
}
#[test]
fn creating_invalid_validated_topic_fails() {
assert_eq!(
ValidatedTopic::new("a/+/b"),
Err(InvalidTopic("a/+/b".to_string()))
);
}
#[test]
fn publish_with_properties_accepts_validated_topic() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
client
.publish_with_properties(
valid_topic,
QoS::ExactlyOnce,
false,
"good bye",
PublishProperties::default(),
)
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn try_publish_accepts_validated_topic() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
client
.try_publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn try_publish_with_properties_accepts_validated_topic() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
client
.try_publish_with_properties(
valid_topic,
QoS::ExactlyOnce,
false,
"good bye",
PublishProperties::default(),
)
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn publishing_invalid_raw_topic_fails() {
let (tx, _) = flume::bounded(1);
let client = Client::from_sender(tx);
let err = client
.publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
.expect_err("Invalid publish topic should fail");
assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
}
#[test]
fn async_publish_paths_accept_validated_topic() {
let (tx, rx) = flume::bounded(4);
let client = AsyncClient::from_senders(tx);
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
client
.publish(
ValidatedTopic::new("hello/world").unwrap(),
QoS::ExactlyOnce,
false,
"good bye",
)
.await
.expect("Should be able to publish");
client
.publish_with_properties(
ValidatedTopic::new("hello/world").unwrap(),
QoS::ExactlyOnce,
false,
"good bye",
PublishProperties::default(),
)
.await
.expect("Should be able to publish");
client
.publish_bytes(
ValidatedTopic::new("hello/world").unwrap(),
QoS::ExactlyOnce,
false,
Bytes::from_static(b"good bye"),
)
.await
.expect("Should be able to publish");
client
.publish_bytes_with_properties(
ValidatedTopic::new("hello/world").unwrap(),
QoS::ExactlyOnce,
false,
Bytes::from_static(b"good bye"),
PublishProperties::default(),
)
.await
.expect("Should be able to publish");
});
let _ = rx.try_recv().expect("Should have message");
let _ = rx.try_recv().expect("Should have message");
let _ = rx.try_recv().expect("Should have message");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn async_try_publish_paths_accept_validated_topic() {
let (tx, rx) = flume::bounded(4);
let client = AsyncClient::from_senders(tx);
client
.try_publish(
ValidatedTopic::new("hello/world").unwrap(),
QoS::ExactlyOnce,
false,
"good bye",
)
.expect("Should be able to publish");
client
.try_publish_with_properties(
ValidatedTopic::new("hello/world").unwrap(),
QoS::ExactlyOnce,
false,
"good bye",
PublishProperties::default(),
)
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
let _ = rx.try_recv().expect("Should have message");
}
#[test]
fn async_publishing_invalid_raw_topic_fails() {
let (tx, _) = flume::bounded(1);
let client = AsyncClient::from_senders(tx);
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let err = client
.publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
.await
.expect_err("Invalid publish topic should fail");
assert!(
matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
);
let err = client
.publish_bytes(
"a/+/b",
QoS::ExactlyOnce,
false,
Bytes::from_static(b"good bye"),
)
.await
.expect_err("Invalid publish topic should fail");
assert!(
matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
);
});
}
#[test]
fn disconnect_with_properties_builds_disconnect_request() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let properties = DisconnectProperties {
session_expiry_interval: Some(120),
reason_string: Some("closing".to_string()),
user_properties: vec![("source".to_string(), "test".to_string())],
server_reference: Some("backup-broker".to_string()),
};
client
.disconnect_with_properties(
DisconnectReasonCode::ImplementationSpecificError,
properties.clone(),
)
.expect("disconnect_with_properties should enqueue request");
let request = rx.try_recv().expect("Should have disconnect request");
match request {
Request::Disconnect(disconnect) => {
assert_eq!(
disconnect.reason_code,
DisconnectReasonCode::ImplementationSpecificError
);
assert_eq!(disconnect.properties, Some(properties));
}
request => panic!("Expected disconnect request, got {:?}", request),
}
}
#[test]
fn try_disconnect_with_properties_builds_disconnect_request() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let properties = DisconnectProperties {
session_expiry_interval: Some(360),
reason_string: Some("maintenance".to_string()),
user_properties: vec![("env".to_string(), "test".to_string())],
server_reference: None,
};
client
.try_disconnect_with_properties(
DisconnectReasonCode::ServerShuttingDown,
properties.clone(),
)
.expect("try_disconnect_with_properties should enqueue request");
let request = rx.try_recv().expect("Should have disconnect request");
match request {
Request::Disconnect(disconnect) => {
assert_eq!(
disconnect.reason_code,
DisconnectReasonCode::ServerShuttingDown
);
assert_eq!(disconnect.properties, Some(properties));
}
request => panic!("Expected disconnect request, got {:?}", request),
}
}
#[test]
fn async_disconnect_with_properties_builds_disconnect_request() {
let (tx, rx) = flume::bounded(1);
let client = AsyncClient::from_senders(tx);
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let properties = DisconnectProperties {
session_expiry_interval: Some(42),
reason_string: Some("done".to_string()),
user_properties: vec![("k".to_string(), "v".to_string())],
server_reference: Some("fallback".to_string()),
};
runtime.block_on(async {
client
.disconnect_with_properties(
DisconnectReasonCode::UseAnotherServer,
properties.clone(),
)
.await
.expect("disconnect_with_properties should enqueue request");
});
let request = rx.try_recv().expect("Should have disconnect request");
match request {
Request::Disconnect(disconnect) => {
assert_eq!(
disconnect.reason_code,
DisconnectReasonCode::UseAnotherServer
);
assert_eq!(disconnect.properties, Some(properties));
}
request => panic!("Expected disconnect request, got {:?}", request),
}
}
#[test]
fn async_try_disconnect_with_properties_builds_disconnect_request() {
let (tx, rx) = flume::bounded(1);
let client = AsyncClient::from_senders(tx);
let properties = DisconnectProperties {
session_expiry_interval: Some(7),
reason_string: Some("bye".to_string()),
user_properties: vec![("actor".to_string(), "test".to_string())],
server_reference: None,
};
client
.try_disconnect_with_properties(
DisconnectReasonCode::AdministrativeAction,
properties.clone(),
)
.expect("try_disconnect_with_properties should enqueue request");
let request = rx.try_recv().expect("Should have disconnect request");
match request {
Request::Disconnect(disconnect) => {
assert_eq!(
disconnect.reason_code,
DisconnectReasonCode::AdministrativeAction
);
assert_eq!(disconnect.properties, Some(properties));
}
request => panic!("Expected disconnect request, got {:?}", request),
}
}
}