Skip to main content

rumqttc/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use std::borrow::Cow;
4use std::time::Duration;
5
6use super::eventloop::RequestEnvelope;
7use super::mqttbytes::QoS;
8use super::mqttbytes::v5::{
9    Auth, AuthProperties, AuthReasonCode, Filter, PubAck, PubRec, Publish, PublishProperties,
10    Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties,
11};
12use super::{
13    ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop,
14    MqttOptions, Request,
15};
16use crate::notice::{PublishNoticeTx, RequestNoticeTx};
17use crate::{PublishNotice, RequestNotice, valid_filter, valid_topic};
18
19use bytes::Bytes;
20use flume::{SendError, Sender, TrySendError};
21use futures_util::FutureExt;
22use tokio::runtime::{self, Runtime};
23use tokio::time::timeout;
24
25/// An error returned when a topic string fails validation against the MQTT specification.
26#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
27#[error("Invalid MQTT topic: '{0}'")]
28pub struct InvalidTopic(String);
29
30/// A newtype wrapper that guarantees its inner `String` is a valid MQTT topic.
31///
32/// This type prevents the cost of repeated validation for topics that are used
33/// frequently. It can only be constructed via [`ValidatedTopic::new`], which
34/// performs a one-time validation check.
35///
36/// Use this when publishing repeatedly to the same topic to avoid per-call
37/// validation overhead in publish APIs that accept [`PublishTopic`].
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct ValidatedTopic(String);
40
41impl ValidatedTopic {
42    /// Constructs a new `ValidatedTopic` after validating the input string.
43    ///
44    /// # Errors
45    ///
46    /// Returns [`InvalidTopic`] if the topic string does not conform to the MQTT specification.
47    pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
48        let topic_string = topic.into();
49        if valid_topic(&topic_string) {
50            Ok(Self(topic_string))
51        } else {
52            Err(InvalidTopic(topic_string))
53        }
54    }
55}
56
57impl From<ValidatedTopic> for String {
58    fn from(topic: ValidatedTopic) -> Self {
59        topic.0
60    }
61}
62
63/// Topic argument accepted by publish APIs.
64///
65/// `ValidatedTopic` variants skip per-call validation while string variants are
66/// validated for MQTT topic correctness.
67pub enum PublishTopic {
68    /// Raw topic input that must be validated before publishing.
69    Unvalidated(String),
70    /// Topic that has already been validated once.
71    Validated(ValidatedTopic),
72}
73
74impl PublishTopic {
75    fn into_string_and_validation(self) -> (String, bool) {
76        match self {
77            Self::Unvalidated(topic) => (topic, true),
78            Self::Validated(topic) => (topic.0, false),
79        }
80    }
81}
82
83impl From<ValidatedTopic> for PublishTopic {
84    fn from(topic: ValidatedTopic) -> Self {
85        Self::Validated(topic)
86    }
87}
88
89impl From<String> for PublishTopic {
90    fn from(topic: String) -> Self {
91        Self::Unvalidated(topic)
92    }
93}
94
95impl From<&str> for PublishTopic {
96    fn from(topic: &str) -> Self {
97        Self::Unvalidated(topic.to_owned())
98    }
99}
100
101impl From<&String> for PublishTopic {
102    fn from(topic: &String) -> Self {
103        Self::Unvalidated(topic.clone())
104    }
105}
106
107impl From<Cow<'_, str>> for PublishTopic {
108    fn from(topic: Cow<'_, str>) -> Self {
109        Self::Unvalidated(topic.into_owned())
110    }
111}
112
113/// Client Error
114#[derive(Debug, thiserror::Error)]
115pub enum ClientError {
116    #[error("Failed to send mqtt requests to eventloop")]
117    Request(Box<Request>),
118    #[error("Failed to send mqtt requests to eventloop")]
119    TryRequest(Box<Request>),
120    #[error("Tracked request API is unavailable for this client instance")]
121    TrackingUnavailable,
122}
123
124impl From<SendError<Request>> for ClientError {
125    fn from(e: SendError<Request>) -> Self {
126        Self::Request(Box::new(e.into_inner()))
127    }
128}
129
130impl From<TrySendError<Request>> for ClientError {
131    fn from(e: TrySendError<Request>) -> Self {
132        Self::TryRequest(Box::new(e.into_inner()))
133    }
134}
135
136#[derive(Clone, Debug)]
137enum RequestSender {
138    Plain(Sender<Request>),
139    WithNotice(Sender<RequestEnvelope>),
140}
141
142fn into_request(envelope: RequestEnvelope) -> Request {
143    let (request, _notice) = envelope.into_parts();
144    request
145}
146
147fn map_send_envelope_error(err: SendError<RequestEnvelope>) -> ClientError {
148    ClientError::Request(Box::new(into_request(err.into_inner())))
149}
150
151fn map_try_send_envelope_error(err: TrySendError<RequestEnvelope>) -> ClientError {
152    match err {
153        TrySendError::Full(envelope) | TrySendError::Disconnected(envelope) => {
154            ClientError::TryRequest(Box::new(into_request(envelope)))
155        }
156    }
157}
158
159/// Prepared acknowledgement packet for manual acknowledgement mode.
160#[derive(Clone, Debug, PartialEq, Eq)]
161pub enum ManualAck {
162    PubAck(PubAck),
163    PubRec(PubRec),
164}
165
166impl ManualAck {
167    fn into_request(self) -> Request {
168        match self {
169            Self::PubAck(ack) => Request::PubAck(ack),
170            Self::PubRec(rec) => Request::PubRec(rec),
171        }
172    }
173}
174
175/// An asynchronous client, communicates with MQTT `EventLoop`.
176///
177/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
178/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
179///
180/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
181/// from the broker, i.e. move ahead.
182#[derive(Clone, Debug)]
183pub struct AsyncClient {
184    request_tx: RequestSender,
185}
186
187impl AsyncClient {
188    /// Create a new `AsyncClient`.
189    ///
190    /// `cap` specifies the capacity of the bounded async channel.
191    pub fn new(options: MqttOptions, cap: usize) -> (Self, EventLoop) {
192        let (eventloop, request_tx) = EventLoop::new_for_async_client(options, cap);
193        let client = Self {
194            request_tx: RequestSender::WithNotice(request_tx),
195        };
196
197        (client, eventloop)
198    }
199
200    /// Create a new `AsyncClient` from a channel `Sender`.
201    ///
202    /// This is mostly useful for creating a test instance where you can
203    /// listen on the corresponding receiver.
204    #[must_use]
205    pub const fn from_senders(request_tx: Sender<Request>) -> Self {
206        Self {
207            request_tx: RequestSender::Plain(request_tx),
208        }
209    }
210
211    async fn send_request_async(&self, request: Request) -> Result<(), ClientError> {
212        match &self.request_tx {
213            RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
214            RequestSender::WithNotice(tx) => tx
215                .send_async(RequestEnvelope::plain(request))
216                .await
217                .map_err(map_send_envelope_error),
218        }
219    }
220
221    fn try_send_request(&self, request: Request) -> Result<(), ClientError> {
222        match &self.request_tx {
223            RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
224            RequestSender::WithNotice(tx) => tx
225                .try_send(RequestEnvelope::plain(request))
226                .map_err(map_try_send_envelope_error),
227        }
228    }
229
230    fn send_request(&self, request: Request) -> Result<(), ClientError> {
231        match &self.request_tx {
232            RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
233            RequestSender::WithNotice(tx) => tx
234                .send(RequestEnvelope::plain(request))
235                .map_err(map_send_envelope_error),
236        }
237    }
238
239    async fn send_tracked_publish_async(
240        &self,
241        publish: Publish,
242    ) -> Result<PublishNotice, ClientError> {
243        let RequestSender::WithNotice(request_tx) = &self.request_tx else {
244            return Err(ClientError::TrackingUnavailable);
245        };
246
247        let (notice_tx, notice) = PublishNoticeTx::new();
248        request_tx
249            .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
250            .await
251            .map_err(map_send_envelope_error)?;
252        Ok(notice)
253    }
254
255    fn try_send_tracked_publish(&self, publish: Publish) -> Result<PublishNotice, ClientError> {
256        let RequestSender::WithNotice(request_tx) = &self.request_tx else {
257            return Err(ClientError::TrackingUnavailable);
258        };
259
260        let (notice_tx, notice) = PublishNoticeTx::new();
261        request_tx
262            .try_send(RequestEnvelope::tracked_publish(publish, notice_tx))
263            .map_err(map_try_send_envelope_error)?;
264        Ok(notice)
265    }
266
267    async fn send_tracked_subscribe_async(
268        &self,
269        subscribe: Subscribe,
270    ) -> Result<RequestNotice, ClientError> {
271        let RequestSender::WithNotice(request_tx) = &self.request_tx else {
272            return Err(ClientError::TrackingUnavailable);
273        };
274
275        let (notice_tx, notice) = RequestNoticeTx::new();
276        request_tx
277            .send_async(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
278            .await
279            .map_err(map_send_envelope_error)?;
280        Ok(notice)
281    }
282
283    fn try_send_tracked_subscribe(
284        &self,
285        subscribe: Subscribe,
286    ) -> Result<RequestNotice, ClientError> {
287        let RequestSender::WithNotice(request_tx) = &self.request_tx else {
288            return Err(ClientError::TrackingUnavailable);
289        };
290
291        let (notice_tx, notice) = RequestNoticeTx::new();
292        request_tx
293            .try_send(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
294            .map_err(map_try_send_envelope_error)?;
295        Ok(notice)
296    }
297
298    async fn send_tracked_unsubscribe_async(
299        &self,
300        unsubscribe: Unsubscribe,
301    ) -> Result<RequestNotice, ClientError> {
302        let RequestSender::WithNotice(request_tx) = &self.request_tx else {
303            return Err(ClientError::TrackingUnavailable);
304        };
305
306        let (notice_tx, notice) = RequestNoticeTx::new();
307        request_tx
308            .send_async(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
309            .await
310            .map_err(map_send_envelope_error)?;
311        Ok(notice)
312    }
313
314    fn try_send_tracked_unsubscribe(
315        &self,
316        unsubscribe: Unsubscribe,
317    ) -> Result<RequestNotice, ClientError> {
318        let RequestSender::WithNotice(request_tx) = &self.request_tx else {
319            return Err(ClientError::TrackingUnavailable);
320        };
321
322        let (notice_tx, notice) = RequestNoticeTx::new();
323        request_tx
324            .try_send(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
325            .map_err(map_try_send_envelope_error)?;
326        Ok(notice)
327    }
328
329    /// Sends a MQTT Publish to the `EventLoop`.
330    async fn handle_publish<T, P>(
331        &self,
332        topic: T,
333        qos: QoS,
334        retain: bool,
335        payload: P,
336        properties: Option<PublishProperties>,
337    ) -> Result<(), ClientError>
338    where
339        T: Into<PublishTopic>,
340        P: Into<Bytes>,
341    {
342        let (topic, needs_validation) = topic.into().into_string_and_validation();
343        let invalid_topic = (needs_validation && !valid_topic(&topic))
344            || empty_topic_without_valid_alias(&topic, properties.as_ref());
345        let mut publish = Publish::new(topic, qos, payload, properties);
346        publish.retain = retain;
347        let publish = Request::Publish(publish);
348
349        if invalid_topic {
350            return Err(ClientError::Request(Box::new(publish)));
351        }
352
353        self.send_request_async(publish).await?;
354        Ok(())
355    }
356
357    async fn handle_publish_tracked<T, P>(
358        &self,
359        topic: T,
360        qos: QoS,
361        retain: bool,
362        payload: P,
363        properties: Option<PublishProperties>,
364    ) -> Result<PublishNotice, ClientError>
365    where
366        T: Into<PublishTopic>,
367        P: Into<Bytes>,
368    {
369        let (topic, needs_validation) = topic.into().into_string_and_validation();
370        let invalid_topic = (needs_validation && !valid_topic(&topic))
371            || empty_topic_without_valid_alias(&topic, properties.as_ref());
372        let mut publish = Publish::new(topic, qos, payload, properties);
373        publish.retain = retain;
374        let request = Request::Publish(publish.clone());
375
376        if invalid_topic {
377            return Err(ClientError::Request(Box::new(request)));
378        }
379
380        self.send_tracked_publish_async(publish).await
381    }
382
383    /// Sends a MQTT Publish with properties to the `EventLoop`.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if the topic or topic alias usage is invalid, or if
388    /// the request cannot be queued on the event loop.
389    pub async fn publish_with_properties<T, P>(
390        &self,
391        topic: T,
392        qos: QoS,
393        retain: bool,
394        payload: P,
395        properties: PublishProperties,
396    ) -> Result<(), ClientError>
397    where
398        T: Into<PublishTopic>,
399        P: Into<Bytes>,
400    {
401        self.handle_publish(topic, qos, retain, payload, Some(properties))
402            .await
403    }
404
405    /// Sends a MQTT Publish with properties to the `EventLoop` and returns a tracked notice.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the topic or topic alias usage is invalid, or if
410    /// the request cannot be queued on the event loop.
411    pub async fn publish_with_properties_tracked<T, P>(
412        &self,
413        topic: T,
414        qos: QoS,
415        retain: bool,
416        payload: P,
417        properties: PublishProperties,
418    ) -> Result<PublishNotice, ClientError>
419    where
420        T: Into<PublishTopic>,
421        P: Into<Bytes>,
422    {
423        self.handle_publish_tracked(topic, qos, retain, payload, Some(properties))
424            .await
425    }
426
427    /// Sends a MQTT Publish to the `EventLoop`.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the topic or topic alias usage is invalid, or if
432    /// the request cannot be queued on the event loop.
433    pub async fn publish<T, P>(
434        &self,
435        topic: T,
436        qos: QoS,
437        retain: bool,
438        payload: P,
439    ) -> Result<(), ClientError>
440    where
441        T: Into<PublishTopic>,
442        P: Into<Bytes>,
443    {
444        self.handle_publish(topic, qos, retain, payload, None).await
445    }
446
447    /// Sends a MQTT Publish to the `EventLoop` and returns a tracked notice.
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if the topic or topic alias usage is invalid, or if
452    /// the request cannot be queued on the event loop.
453    pub async fn publish_tracked<T, P>(
454        &self,
455        topic: T,
456        qos: QoS,
457        retain: bool,
458        payload: P,
459    ) -> Result<PublishNotice, ClientError>
460    where
461        T: Into<PublishTopic>,
462        P: Into<Bytes>,
463    {
464        self.handle_publish_tracked(topic, qos, retain, payload, None)
465            .await
466    }
467
468    /// Attempts to send a MQTT Publish to the `EventLoop`.
469    fn handle_try_publish<T, P>(
470        &self,
471        topic: T,
472        qos: QoS,
473        retain: bool,
474        payload: P,
475        properties: Option<PublishProperties>,
476    ) -> Result<(), ClientError>
477    where
478        T: Into<PublishTopic>,
479        P: Into<Bytes>,
480    {
481        let (topic, needs_validation) = topic.into().into_string_and_validation();
482        let invalid_topic = (needs_validation && !valid_topic(&topic))
483            || empty_topic_without_valid_alias(&topic, properties.as_ref());
484        let mut publish = Publish::new(topic, qos, payload, properties);
485        publish.retain = retain;
486        let publish = Request::Publish(publish);
487
488        if invalid_topic {
489            return Err(ClientError::TryRequest(Box::new(publish)));
490        }
491
492        self.try_send_request(publish)?;
493        Ok(())
494    }
495
496    fn handle_try_publish_tracked<T, P>(
497        &self,
498        topic: T,
499        qos: QoS,
500        retain: bool,
501        payload: P,
502        properties: Option<PublishProperties>,
503    ) -> Result<PublishNotice, ClientError>
504    where
505        T: Into<PublishTopic>,
506        P: Into<Bytes>,
507    {
508        let (topic, needs_validation) = topic.into().into_string_and_validation();
509        let invalid_topic = (needs_validation && !valid_topic(&topic))
510            || empty_topic_without_valid_alias(&topic, properties.as_ref());
511        let mut publish = Publish::new(topic, qos, payload, properties);
512        publish.retain = retain;
513        let request = Request::Publish(publish.clone());
514
515        if invalid_topic {
516            return Err(ClientError::TryRequest(Box::new(request)));
517        }
518
519        self.try_send_tracked_publish(publish)
520    }
521
522    /// Attempts to send a MQTT Publish with properties to the `EventLoop`.
523    ///
524    /// # Errors
525    ///
526    /// Returns an error if the topic or topic alias usage is invalid, or if
527    /// the request cannot be queued immediately on the event loop.
528    pub fn try_publish_with_properties<T, P>(
529        &self,
530        topic: T,
531        qos: QoS,
532        retain: bool,
533        payload: P,
534        properties: PublishProperties,
535    ) -> Result<(), ClientError>
536    where
537        T: Into<PublishTopic>,
538        P: Into<Bytes>,
539    {
540        self.handle_try_publish(topic, qos, retain, payload, Some(properties))
541    }
542
543    /// Attempts to send a MQTT Publish with properties to the `EventLoop` and returns a tracked notice.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if the topic or topic alias usage is invalid, or if
548    /// the request cannot be queued immediately on the event loop.
549    pub fn try_publish_with_properties_tracked<T, P>(
550        &self,
551        topic: T,
552        qos: QoS,
553        retain: bool,
554        payload: P,
555        properties: PublishProperties,
556    ) -> Result<PublishNotice, ClientError>
557    where
558        T: Into<PublishTopic>,
559        P: Into<Bytes>,
560    {
561        self.handle_try_publish_tracked(topic, qos, retain, payload, Some(properties))
562    }
563
564    /// Attempts to send a MQTT Publish to the `EventLoop`.
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if the topic or topic alias usage is invalid, or if
569    /// the request cannot be queued immediately on the event loop.
570    pub fn try_publish<T, P>(
571        &self,
572        topic: T,
573        qos: QoS,
574        retain: bool,
575        payload: P,
576    ) -> Result<(), ClientError>
577    where
578        T: Into<PublishTopic>,
579        P: Into<Bytes>,
580    {
581        self.handle_try_publish(topic, qos, retain, payload, None)
582    }
583
584    /// Attempts to send a MQTT Publish to the `EventLoop` and returns a tracked notice.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the topic or topic alias usage is invalid, or if
589    /// the request cannot be queued immediately on the event loop.
590    pub fn try_publish_tracked<T, P>(
591        &self,
592        topic: T,
593        qos: QoS,
594        retain: bool,
595        payload: P,
596    ) -> Result<PublishNotice, ClientError>
597    where
598        T: Into<PublishTopic>,
599        P: Into<Bytes>,
600    {
601        self.handle_try_publish_tracked(topic, qos, retain, payload, None)
602    }
603
604    /// Prepares a MQTT PubAck/PubRec packet for manual acknowledgement.
605    ///
606    /// Returns `None` for `QoS0` publishes, which do not require acknowledgement.
607    pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
608        prepare_ack(publish)
609    }
610
611    /// Sends a prepared MQTT PubAck/PubRec to the `EventLoop`.
612    ///
613    /// This is useful when `manual_acks` is enabled and acknowledgement must be deferred.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if the acknowledgement cannot be queued on the event
618    /// loop.
619    pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
620        self.send_request_async(ack.into_request()).await?;
621        Ok(())
622    }
623
624    /// Attempts to send a prepared MQTT PubAck/PubRec to the `EventLoop`.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the acknowledgement cannot be queued immediately on
629    /// the event loop.
630    pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
631        self.try_send_request(ack.into_request())?;
632        Ok(())
633    }
634
635    /// Sends a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
636    /// Only needed if the `manual_acks` flag is set.
637    ///
638    /// # Errors
639    ///
640    /// Returns an error if the derived acknowledgement cannot be queued on the
641    /// event loop.
642    pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
643        if let Some(ack) = self.prepare_ack(publish) {
644            self.manual_ack(ack).await?;
645        }
646        Ok(())
647    }
648
649    /// Attempts to send a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
650    /// Only needed if the `manual_acks` flag is set.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the derived acknowledgement cannot be queued
655    /// immediately on the event loop.
656    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
657        if let Some(ack) = self.prepare_ack(publish) {
658            self.try_manual_ack(ack)?;
659        }
660        Ok(())
661    }
662
663    /// Sends a MQTT AUTH packet to the `EventLoop`.
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if the AUTH packet cannot be queued on the event loop.
668    pub async fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
669        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
670        let auth = Request::Auth(auth);
671        self.send_request_async(auth).await?;
672        Ok(())
673    }
674
675    /// Attempts to send a MQTT AUTH packet to the `EventLoop`.
676    ///
677    /// # Errors
678    ///
679    /// Returns an error if the AUTH packet cannot be queued immediately on the
680    /// event loop.
681    pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
682        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
683        let auth = Request::Auth(auth);
684        self.try_send_request(auth)?;
685        Ok(())
686    }
687
688    /// Sends a MQTT Publish to the `EventLoop`
689    async fn handle_publish_bytes<T>(
690        &self,
691        topic: T,
692        qos: QoS,
693        retain: bool,
694        payload: Bytes,
695        properties: Option<PublishProperties>,
696    ) -> Result<(), ClientError>
697    where
698        T: Into<PublishTopic>,
699    {
700        let (topic, needs_validation) = topic.into().into_string_and_validation();
701        let invalid_topic = (needs_validation && !valid_topic(&topic))
702            || empty_topic_without_valid_alias(&topic, properties.as_ref());
703        let mut publish = Publish::new(topic, qos, payload, properties);
704        publish.retain = retain;
705        let publish = Request::Publish(publish);
706
707        if invalid_topic {
708            return Err(ClientError::Request(Box::new(publish)));
709        }
710
711        self.send_request_async(publish).await?;
712        Ok(())
713    }
714
715    async fn handle_publish_bytes_tracked<T>(
716        &self,
717        topic: T,
718        qos: QoS,
719        retain: bool,
720        payload: Bytes,
721        properties: Option<PublishProperties>,
722    ) -> Result<PublishNotice, ClientError>
723    where
724        T: Into<PublishTopic>,
725    {
726        let (topic, needs_validation) = topic.into().into_string_and_validation();
727        let invalid_topic = (needs_validation && !valid_topic(&topic))
728            || empty_topic_without_valid_alias(&topic, properties.as_ref());
729        let mut publish = Publish::new(topic, qos, payload, properties);
730        publish.retain = retain;
731        let request = Request::Publish(publish.clone());
732
733        if invalid_topic {
734            return Err(ClientError::Request(Box::new(request)));
735        }
736
737        self.send_tracked_publish_async(publish).await
738    }
739
740    /// Sends a MQTT Publish with properties to the `EventLoop`.
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if the topic or topic alias usage is invalid, or if
745    /// the request cannot be queued on the event loop.
746    pub async fn publish_bytes_with_properties<T>(
747        &self,
748        topic: T,
749        qos: QoS,
750        retain: bool,
751        payload: Bytes,
752        properties: PublishProperties,
753    ) -> Result<(), ClientError>
754    where
755        T: Into<PublishTopic>,
756    {
757        self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
758            .await
759    }
760
761    /// Sends a MQTT Publish with `Bytes` payload and properties, returning a tracked notice.
762    ///
763    /// # Errors
764    ///
765    /// Returns an error if the topic or topic alias usage is invalid, or if
766    /// the request cannot be queued on the event loop.
767    pub async fn publish_bytes_with_properties_tracked<T>(
768        &self,
769        topic: T,
770        qos: QoS,
771        retain: bool,
772        payload: Bytes,
773        properties: PublishProperties,
774    ) -> Result<PublishNotice, ClientError>
775    where
776        T: Into<PublishTopic>,
777    {
778        self.handle_publish_bytes_tracked(topic, qos, retain, payload, Some(properties))
779            .await
780    }
781
782    /// Sends a MQTT Publish with `Bytes` payload to the `EventLoop`.
783    ///
784    /// # Errors
785    ///
786    /// Returns an error if the topic or topic alias usage is invalid, or if
787    /// the request cannot be queued on the event loop.
788    pub async fn publish_bytes<T>(
789        &self,
790        topic: T,
791        qos: QoS,
792        retain: bool,
793        payload: Bytes,
794    ) -> Result<(), ClientError>
795    where
796        T: Into<PublishTopic>,
797    {
798        self.handle_publish_bytes(topic, qos, retain, payload, None)
799            .await
800    }
801
802    /// Sends a MQTT Publish with `Bytes` payload to the `EventLoop` and returns a tracked notice.
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the topic or topic alias usage is invalid, or if
807    /// the request cannot be queued on the event loop.
808    pub async fn publish_bytes_tracked<T>(
809        &self,
810        topic: T,
811        qos: QoS,
812        retain: bool,
813        payload: Bytes,
814    ) -> Result<PublishNotice, ClientError>
815    where
816        T: Into<PublishTopic>,
817    {
818        self.handle_publish_bytes_tracked(topic, qos, retain, payload, None)
819            .await
820    }
821
822    /// Sends a MQTT Subscribe to the `EventLoop`
823    async fn handle_subscribe<S: Into<String>>(
824        &self,
825        topic: S,
826        qos: QoS,
827        properties: Option<SubscribeProperties>,
828    ) -> Result<(), ClientError> {
829        let filter = Filter::new(topic, qos);
830        let subscribe = Subscribe::new(filter, properties);
831        if !subscribe_has_valid_filters(&subscribe) {
832            return Err(ClientError::Request(Box::new(subscribe.into())));
833        }
834
835        self.send_request_async(subscribe.into()).await?;
836        Ok(())
837    }
838
839    async fn handle_subscribe_tracked<S: Into<String>>(
840        &self,
841        topic: S,
842        qos: QoS,
843        properties: Option<SubscribeProperties>,
844    ) -> Result<RequestNotice, ClientError> {
845        let filter = Filter::new(topic, qos);
846        let subscribe = Subscribe::new(filter, properties);
847        if !subscribe_has_valid_filters(&subscribe) {
848            return Err(ClientError::Request(Box::new(subscribe.into())));
849        }
850
851        self.send_tracked_subscribe_async(subscribe).await
852    }
853
854    /// Sends a MQTT Subscribe with properties to the `EventLoop`.
855    ///
856    /// # Errors
857    ///
858    /// Returns an error if the topic filter is invalid or if the request
859    /// cannot be queued on the event loop.
860    pub async fn subscribe_with_properties<S: Into<String>>(
861        &self,
862        topic: S,
863        qos: QoS,
864        properties: SubscribeProperties,
865    ) -> Result<(), ClientError> {
866        self.handle_subscribe(topic, qos, Some(properties)).await
867    }
868
869    /// Sends a tracked MQTT Subscribe with properties to the `EventLoop`.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if the topic filter is invalid or if the request
874    /// cannot be queued on the event loop.
875    pub async fn subscribe_with_properties_tracked<S: Into<String>>(
876        &self,
877        topic: S,
878        qos: QoS,
879        properties: SubscribeProperties,
880    ) -> Result<RequestNotice, ClientError> {
881        self.handle_subscribe_tracked(topic, qos, Some(properties))
882            .await
883    }
884
885    /// Sends a MQTT Subscribe to the `EventLoop`.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if the topic filter is invalid or if the request
890    /// cannot be queued on the event loop.
891    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
892        self.handle_subscribe(topic, qos, None).await
893    }
894
895    /// Sends a tracked MQTT Subscribe to the `EventLoop`.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if the topic filter is invalid or if the request
900    /// cannot be queued on the event loop.
901    pub async fn subscribe_tracked<S: Into<String>>(
902        &self,
903        topic: S,
904        qos: QoS,
905    ) -> Result<RequestNotice, ClientError> {
906        self.handle_subscribe_tracked(topic, qos, None).await
907    }
908
909    /// Attempts to send a MQTT Subscribe to the `EventLoop`
910    fn handle_try_subscribe<S: Into<String>>(
911        &self,
912        topic: S,
913        qos: QoS,
914        properties: Option<SubscribeProperties>,
915    ) -> Result<(), ClientError> {
916        let filter = Filter::new(topic, qos);
917        let subscribe = Subscribe::new(filter, properties);
918        if !subscribe_has_valid_filters(&subscribe) {
919            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
920        }
921
922        self.try_send_request(subscribe.into())?;
923        Ok(())
924    }
925
926    fn handle_try_subscribe_tracked<S: Into<String>>(
927        &self,
928        topic: S,
929        qos: QoS,
930        properties: Option<SubscribeProperties>,
931    ) -> Result<RequestNotice, ClientError> {
932        let filter = Filter::new(topic, qos);
933        let subscribe = Subscribe::new(filter, properties);
934        if !subscribe_has_valid_filters(&subscribe) {
935            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
936        }
937
938        self.try_send_tracked_subscribe(subscribe)
939    }
940
941    /// Attempts to send a MQTT Subscribe with properties to the `EventLoop`.
942    ///
943    /// # Errors
944    ///
945    /// Returns an error if the topic filter is invalid or if the request
946    /// cannot be queued immediately on the event loop.
947    pub fn try_subscribe_with_properties<S: Into<String>>(
948        &self,
949        topic: S,
950        qos: QoS,
951        properties: SubscribeProperties,
952    ) -> Result<(), ClientError> {
953        self.handle_try_subscribe(topic, qos, Some(properties))
954    }
955
956    /// Attempts to send a tracked MQTT Subscribe with properties to the `EventLoop`.
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if the topic filter is invalid or if the request
961    /// cannot be queued immediately on the event loop.
962    pub fn try_subscribe_with_properties_tracked<S: Into<String>>(
963        &self,
964        topic: S,
965        qos: QoS,
966        properties: SubscribeProperties,
967    ) -> Result<RequestNotice, ClientError> {
968        self.handle_try_subscribe_tracked(topic, qos, Some(properties))
969    }
970
971    /// Attempts to send a MQTT Subscribe to the `EventLoop`.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the topic filter is invalid or if the request
976    /// cannot be queued immediately on the event loop.
977    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
978        self.handle_try_subscribe(topic, qos, None)
979    }
980
981    /// Attempts to send a tracked MQTT Subscribe to the `EventLoop`.
982    ///
983    /// # Errors
984    ///
985    /// Returns an error if the topic filter is invalid or if the request
986    /// cannot be queued immediately on the event loop.
987    pub fn try_subscribe_tracked<S: Into<String>>(
988        &self,
989        topic: S,
990        qos: QoS,
991    ) -> Result<RequestNotice, ClientError> {
992        self.handle_try_subscribe_tracked(topic, qos, None)
993    }
994
995    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
996    async fn handle_subscribe_many<T>(
997        &self,
998        topics: T,
999        properties: Option<SubscribeProperties>,
1000    ) -> Result<(), ClientError>
1001    where
1002        T: IntoIterator<Item = Filter>,
1003    {
1004        let subscribe = Subscribe::new_many(topics, properties);
1005        if !subscribe_has_valid_filters(&subscribe) {
1006            return Err(ClientError::Request(Box::new(subscribe.into())));
1007        }
1008
1009        self.send_request_async(subscribe.into()).await?;
1010
1011        Ok(())
1012    }
1013
1014    async fn handle_subscribe_many_tracked<T>(
1015        &self,
1016        topics: T,
1017        properties: Option<SubscribeProperties>,
1018    ) -> Result<RequestNotice, ClientError>
1019    where
1020        T: IntoIterator<Item = Filter>,
1021    {
1022        let subscribe = Subscribe::new_many(topics, properties);
1023        if !subscribe_has_valid_filters(&subscribe) {
1024            return Err(ClientError::Request(Box::new(subscribe.into())));
1025        }
1026
1027        self.send_tracked_subscribe_async(subscribe).await
1028    }
1029
1030    /// Sends a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns an error if the filter list is invalid or if the request cannot
1035    /// be queued on the event loop.
1036    pub async fn subscribe_many_with_properties<T>(
1037        &self,
1038        topics: T,
1039        properties: SubscribeProperties,
1040    ) -> Result<(), ClientError>
1041    where
1042        T: IntoIterator<Item = Filter>,
1043    {
1044        self.handle_subscribe_many(topics, Some(properties)).await
1045    }
1046
1047    /// Sends a tracked MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if the filter list is invalid or if the request cannot
1052    /// be queued on the event loop.
1053    pub async fn subscribe_many_with_properties_tracked<T>(
1054        &self,
1055        topics: T,
1056        properties: SubscribeProperties,
1057    ) -> Result<RequestNotice, ClientError>
1058    where
1059        T: IntoIterator<Item = Filter>,
1060    {
1061        self.handle_subscribe_many_tracked(topics, Some(properties))
1062            .await
1063    }
1064
1065    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`.
1066    ///
1067    /// # Errors
1068    ///
1069    /// Returns an error if the filter list is invalid or if the request cannot
1070    /// be queued on the event loop.
1071    pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1072    where
1073        T: IntoIterator<Item = Filter>,
1074    {
1075        self.handle_subscribe_many(topics, None).await
1076    }
1077
1078    /// Sends a tracked MQTT Subscribe for multiple topics to the `EventLoop`.
1079    ///
1080    /// # Errors
1081    ///
1082    /// Returns an error if the filter list is invalid or if the request cannot
1083    /// be queued on the event loop.
1084    pub async fn subscribe_many_tracked<T>(&self, topics: T) -> Result<RequestNotice, ClientError>
1085    where
1086        T: IntoIterator<Item = Filter>,
1087    {
1088        self.handle_subscribe_many_tracked(topics, None).await
1089    }
1090
1091    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
1092    fn handle_try_subscribe_many<T>(
1093        &self,
1094        topics: T,
1095        properties: Option<SubscribeProperties>,
1096    ) -> Result<(), ClientError>
1097    where
1098        T: IntoIterator<Item = Filter>,
1099    {
1100        let subscribe = Subscribe::new_many(topics, properties);
1101        if !subscribe_has_valid_filters(&subscribe) {
1102            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1103        }
1104
1105        self.try_send_request(subscribe.into())?;
1106        Ok(())
1107    }
1108
1109    fn handle_try_subscribe_many_tracked<T>(
1110        &self,
1111        topics: T,
1112        properties: Option<SubscribeProperties>,
1113    ) -> Result<RequestNotice, ClientError>
1114    where
1115        T: IntoIterator<Item = Filter>,
1116    {
1117        let subscribe = Subscribe::new_many(topics, properties);
1118        if !subscribe_has_valid_filters(&subscribe) {
1119            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1120        }
1121
1122        self.try_send_tracked_subscribe(subscribe)
1123    }
1124
1125    /// Attempts to send a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns an error if the filter list is invalid or if the request cannot
1130    /// be queued immediately on the event loop.
1131    pub fn try_subscribe_many_with_properties<T>(
1132        &self,
1133        topics: T,
1134        properties: SubscribeProperties,
1135    ) -> Result<(), ClientError>
1136    where
1137        T: IntoIterator<Item = Filter>,
1138    {
1139        self.handle_try_subscribe_many(topics, Some(properties))
1140    }
1141
1142    /// Attempts to send a tracked MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1143    ///
1144    /// # Errors
1145    ///
1146    /// Returns an error if the filter list is invalid or if the request cannot
1147    /// be queued immediately on the event loop.
1148    pub fn try_subscribe_many_with_properties_tracked<T>(
1149        &self,
1150        topics: T,
1151        properties: SubscribeProperties,
1152    ) -> Result<RequestNotice, ClientError>
1153    where
1154        T: IntoIterator<Item = Filter>,
1155    {
1156        self.handle_try_subscribe_many_tracked(topics, Some(properties))
1157    }
1158
1159    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`.
1160    ///
1161    /// # Errors
1162    ///
1163    /// Returns an error if the filter list is invalid or if the request cannot
1164    /// be queued immediately on the event loop.
1165    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1166    where
1167        T: IntoIterator<Item = Filter>,
1168    {
1169        self.handle_try_subscribe_many(topics, None)
1170    }
1171
1172    /// Attempts to send a tracked MQTT Subscribe for multiple topics to the `EventLoop`.
1173    ///
1174    /// # Errors
1175    ///
1176    /// Returns an error if the filter list is invalid or if the request cannot
1177    /// be queued immediately on the event loop.
1178    pub fn try_subscribe_many_tracked<T>(&self, topics: T) -> Result<RequestNotice, ClientError>
1179    where
1180        T: IntoIterator<Item = Filter>,
1181    {
1182        self.handle_try_subscribe_many_tracked(topics, None)
1183    }
1184
1185    /// Sends a MQTT Unsubscribe to the `EventLoop`
1186    async fn handle_unsubscribe<S: Into<String>>(
1187        &self,
1188        topic: S,
1189        properties: Option<UnsubscribeProperties>,
1190    ) -> Result<(), ClientError> {
1191        let unsubscribe = Unsubscribe::new(topic, properties);
1192        let request = Request::Unsubscribe(unsubscribe);
1193        self.send_request_async(request).await?;
1194        Ok(())
1195    }
1196
1197    async fn handle_unsubscribe_tracked<S: Into<String>>(
1198        &self,
1199        topic: S,
1200        properties: Option<UnsubscribeProperties>,
1201    ) -> Result<RequestNotice, ClientError> {
1202        let unsubscribe = Unsubscribe::new(topic, properties);
1203        self.send_tracked_unsubscribe_async(unsubscribe).await
1204    }
1205
1206    /// Sends a MQTT Unsubscribe with properties to the `EventLoop`.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Returns an error if the request cannot be queued on the event loop.
1211    pub async fn unsubscribe_with_properties<S: Into<String>>(
1212        &self,
1213        topic: S,
1214        properties: UnsubscribeProperties,
1215    ) -> Result<(), ClientError> {
1216        self.handle_unsubscribe(topic, Some(properties)).await
1217    }
1218
1219    /// Sends a tracked MQTT Unsubscribe with properties to the `EventLoop`.
1220    ///
1221    /// # Errors
1222    ///
1223    /// Returns an error if the request cannot be queued on the event loop.
1224    pub async fn unsubscribe_with_properties_tracked<S: Into<String>>(
1225        &self,
1226        topic: S,
1227        properties: UnsubscribeProperties,
1228    ) -> Result<RequestNotice, ClientError> {
1229        self.handle_unsubscribe_tracked(topic, Some(properties))
1230            .await
1231    }
1232
1233    /// Sends a MQTT Unsubscribe to the `EventLoop`.
1234    ///
1235    /// # Errors
1236    ///
1237    /// Returns an error if the request cannot be queued on the event loop.
1238    pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1239        self.handle_unsubscribe(topic, None).await
1240    }
1241
1242    /// Sends a tracked MQTT Unsubscribe to the `EventLoop`.
1243    ///
1244    /// # Errors
1245    ///
1246    /// Returns an error if the request cannot be queued on the event loop.
1247    pub async fn unsubscribe_tracked<S: Into<String>>(
1248        &self,
1249        topic: S,
1250    ) -> Result<RequestNotice, ClientError> {
1251        self.handle_unsubscribe_tracked(topic, None).await
1252    }
1253
1254    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`
1255    fn handle_try_unsubscribe<S: Into<String>>(
1256        &self,
1257        topic: S,
1258        properties: Option<UnsubscribeProperties>,
1259    ) -> Result<(), ClientError> {
1260        let unsubscribe = Unsubscribe::new(topic, properties);
1261        let request = Request::Unsubscribe(unsubscribe);
1262        self.try_send_request(request)?;
1263        Ok(())
1264    }
1265
1266    fn handle_try_unsubscribe_tracked<S: Into<String>>(
1267        &self,
1268        topic: S,
1269        properties: Option<UnsubscribeProperties>,
1270    ) -> Result<RequestNotice, ClientError> {
1271        let unsubscribe = Unsubscribe::new(topic, properties);
1272        self.try_send_tracked_unsubscribe(unsubscribe)
1273    }
1274
1275    /// Attempts to send a MQTT Unsubscribe with properties to the `EventLoop`.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns an error if the request cannot be queued immediately on the
1280    /// event loop.
1281    pub fn try_unsubscribe_with_properties<S: Into<String>>(
1282        &self,
1283        topic: S,
1284        properties: UnsubscribeProperties,
1285    ) -> Result<(), ClientError> {
1286        self.handle_try_unsubscribe(topic, Some(properties))
1287    }
1288
1289    /// Attempts to send a tracked MQTT Unsubscribe with properties to the `EventLoop`.
1290    ///
1291    /// # Errors
1292    ///
1293    /// Returns an error if the request cannot be queued immediately on the
1294    /// event loop.
1295    pub fn try_unsubscribe_with_properties_tracked<S: Into<String>>(
1296        &self,
1297        topic: S,
1298        properties: UnsubscribeProperties,
1299    ) -> Result<RequestNotice, ClientError> {
1300        self.handle_try_unsubscribe_tracked(topic, Some(properties))
1301    }
1302
1303    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`.
1304    ///
1305    /// # Errors
1306    ///
1307    /// Returns an error if the request cannot be queued immediately on the
1308    /// event loop.
1309    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1310        self.handle_try_unsubscribe(topic, None)
1311    }
1312
1313    /// Attempts to send a tracked MQTT Unsubscribe to the `EventLoop`.
1314    ///
1315    /// # Errors
1316    ///
1317    /// Returns an error if the request cannot be queued immediately on the
1318    /// event loop.
1319    pub fn try_unsubscribe_tracked<S: Into<String>>(
1320        &self,
1321        topic: S,
1322    ) -> Result<RequestNotice, ClientError> {
1323        self.handle_try_unsubscribe_tracked(topic, None)
1324    }
1325
1326    /// Sends a MQTT disconnect to the `EventLoop` with default `DisconnectReasonCode::NormalDisconnection`
1327    ///
1328    /// # Errors
1329    ///
1330    /// Returns an error if the disconnect request cannot be queued on the
1331    /// event loop.
1332    pub async fn disconnect(&self) -> Result<(), ClientError> {
1333        self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1334            .await
1335    }
1336
1337    /// Sends a MQTT disconnect to the `EventLoop` with properties.
1338    ///
1339    /// # Errors
1340    ///
1341    /// Returns an error if the disconnect request cannot be queued on the
1342    /// event loop.
1343    pub async fn disconnect_with_properties(
1344        &self,
1345        reason: DisconnectReasonCode,
1346        properties: DisconnectProperties,
1347    ) -> Result<(), ClientError> {
1348        self.handle_disconnect(reason, Some(properties)).await
1349    }
1350
1351    // Handle disconnect interface which can have properties or not
1352    async fn handle_disconnect(
1353        &self,
1354        reason: DisconnectReasonCode,
1355        properties: Option<DisconnectProperties>,
1356    ) -> Result<(), ClientError> {
1357        let request = Self::build_disconnect_request(reason, properties);
1358        self.send_request_async(request).await?;
1359        Ok(())
1360    }
1361
1362    /// Attempts to send a MQTT disconnect to the `EventLoop` with default `DisconnectReasonCode::NormalDisconnection`
1363    ///
1364    /// # Errors
1365    ///
1366    /// Returns an error if the disconnect request cannot be queued
1367    /// immediately on the event loop.
1368    pub fn try_disconnect(&self) -> Result<(), ClientError> {
1369        self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1370    }
1371
1372    /// Attempts to send a MQTT disconnect to the `EventLoop` with properties.
1373    ///
1374    /// # Errors
1375    ///
1376    /// Returns an error if the disconnect request cannot be queued
1377    /// immediately on the event loop.
1378    pub fn try_disconnect_with_properties(
1379        &self,
1380        reason: DisconnectReasonCode,
1381        properties: DisconnectProperties,
1382    ) -> Result<(), ClientError> {
1383        self.handle_try_disconnect(reason, Some(properties))
1384    }
1385
1386    // Handle disconnect interface which can have properties or not
1387    fn handle_try_disconnect(
1388        &self,
1389        reason: DisconnectReasonCode,
1390        properties: Option<DisconnectProperties>,
1391    ) -> Result<(), ClientError> {
1392        let request = Self::build_disconnect_request(reason, properties);
1393        self.try_send_request(request)?;
1394        Ok(())
1395    }
1396
1397    // Helper function to build disconnect request
1398    fn build_disconnect_request(
1399        reason: DisconnectReasonCode,
1400        properties: Option<DisconnectProperties>,
1401    ) -> Request {
1402        properties.map_or_else(
1403            || Request::Disconnect(Disconnect::new(reason)),
1404            |p| Request::Disconnect(Disconnect::new_with_properties(reason, p)),
1405        )
1406    }
1407}
1408
1409const fn prepare_ack(publish: &Publish) -> Option<ManualAck> {
1410    let ack = match publish.qos {
1411        QoS::AtMostOnce => return None,
1412        QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)),
1413        QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)),
1414    };
1415    Some(ack)
1416}
1417
1418/// A synchronous client, communicates with MQTT `EventLoop`.
1419///
1420/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
1421/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
1422/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
1423///
1424/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
1425/// to send, receive and process packets from the broker, i.e. move ahead.
1426///
1427/// An asynchronous channel handle can also be extracted if necessary.
1428#[derive(Clone)]
1429pub struct Client {
1430    client: AsyncClient,
1431}
1432
1433impl Client {
1434    /// Create a new `Client`
1435    ///
1436    /// `cap` specifies the capacity of the bounded async channel.
1437    ///
1438    /// # Panics
1439    ///
1440    /// Panics if the current-thread Tokio runtime cannot be created.
1441    pub fn new(options: MqttOptions, cap: usize) -> (Self, Connection) {
1442        let (client, eventloop) = AsyncClient::new(options, cap);
1443        let client = Self { client };
1444
1445        let runtime = runtime::Builder::new_current_thread()
1446            .enable_all()
1447            .build()
1448            .unwrap();
1449
1450        let connection = Connection::new(eventloop, runtime);
1451        (client, connection)
1452    }
1453
1454    /// Create a new `Client` from a channel `Sender`.
1455    ///
1456    /// This is mostly useful for creating a test instance where you can
1457    /// listen on the corresponding receiver.
1458    #[must_use]
1459    pub const fn from_sender(request_tx: Sender<Request>) -> Self {
1460        Self {
1461            client: AsyncClient::from_senders(request_tx),
1462        }
1463    }
1464
1465    /// Sends a MQTT Publish to the `EventLoop`
1466    fn handle_publish<T, P>(
1467        &self,
1468        topic: T,
1469        qos: QoS,
1470        retain: bool,
1471        payload: P,
1472        properties: Option<PublishProperties>,
1473    ) -> Result<(), ClientError>
1474    where
1475        T: Into<PublishTopic>,
1476        P: Into<Bytes>,
1477    {
1478        let (topic, needs_validation) = topic.into().into_string_and_validation();
1479        let invalid_topic = (needs_validation && !valid_topic(&topic))
1480            || empty_topic_without_valid_alias(&topic, properties.as_ref());
1481        let mut publish = Publish::new(topic, qos, payload, properties);
1482        publish.retain = retain;
1483        let request = Request::Publish(publish);
1484
1485        if invalid_topic {
1486            return Err(ClientError::Request(Box::new(request)));
1487        }
1488
1489        self.client.send_request(request)?;
1490        Ok(())
1491    }
1492
1493    /// Sends a MQTT Publish with properties to the `EventLoop`.
1494    ///
1495    /// # Errors
1496    ///
1497    /// Returns an error if the topic or topic alias usage is invalid, or if
1498    /// the request cannot be queued on the event loop.
1499    pub fn publish_with_properties<T, P>(
1500        &self,
1501        topic: T,
1502        qos: QoS,
1503        retain: bool,
1504        payload: P,
1505        properties: PublishProperties,
1506    ) -> Result<(), ClientError>
1507    where
1508        T: Into<PublishTopic>,
1509        P: Into<Bytes>,
1510    {
1511        self.handle_publish(topic, qos, retain, payload, Some(properties))
1512    }
1513
1514    /// Sends a MQTT Publish to the `EventLoop`.
1515    ///
1516    /// # Errors
1517    ///
1518    /// Returns an error if the topic or topic alias usage is invalid, or if
1519    /// the request cannot be queued on the event loop.
1520    pub fn publish<T, P>(
1521        &self,
1522        topic: T,
1523        qos: QoS,
1524        retain: bool,
1525        payload: P,
1526    ) -> Result<(), ClientError>
1527    where
1528        T: Into<PublishTopic>,
1529        P: Into<Bytes>,
1530    {
1531        self.handle_publish(topic, qos, retain, payload, None)
1532    }
1533
1534    /// Attempts to send a MQTT Publish with properties to the `EventLoop`.
1535    ///
1536    /// # Errors
1537    ///
1538    /// Returns an error if the topic or topic alias usage is invalid, or if
1539    /// the request cannot be queued immediately on the event loop.
1540    pub fn try_publish_with_properties<T, P>(
1541        &self,
1542        topic: T,
1543        qos: QoS,
1544        retain: bool,
1545        payload: P,
1546        properties: PublishProperties,
1547    ) -> Result<(), ClientError>
1548    where
1549        T: Into<PublishTopic>,
1550        P: Into<Bytes>,
1551    {
1552        self.client
1553            .try_publish_with_properties(topic, qos, retain, payload, properties)
1554    }
1555
1556    /// Attempts to send a MQTT Publish to the `EventLoop`.
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns an error if the topic or topic alias usage is invalid, or if
1561    /// the request cannot be queued immediately on the event loop.
1562    pub fn try_publish<T, P>(
1563        &self,
1564        topic: T,
1565        qos: QoS,
1566        retain: bool,
1567        payload: P,
1568    ) -> Result<(), ClientError>
1569    where
1570        T: Into<PublishTopic>,
1571        P: Into<Bytes>,
1572    {
1573        self.client.try_publish(topic, qos, retain, payload)
1574    }
1575
1576    /// Prepares a MQTT PubAck/PubRec packet for manual acknowledgement.
1577    ///
1578    /// Returns `None` for `QoS0` publishes, which do not require acknowledgement.
1579    pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
1580        self.client.prepare_ack(publish)
1581    }
1582
1583    /// Sends a prepared MQTT PubAck/PubRec to the `EventLoop`.
1584    ///
1585    /// # Errors
1586    ///
1587    /// Returns an error if the acknowledgement cannot be queued on the event
1588    /// loop.
1589    pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1590        self.client.send_request(ack.into_request())?;
1591        Ok(())
1592    }
1593
1594    /// Attempts to send a prepared MQTT PubAck/PubRec to the `EventLoop`.
1595    ///
1596    /// # Errors
1597    ///
1598    /// Returns an error if the acknowledgement cannot be queued immediately on
1599    /// the event loop.
1600    pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1601        self.client.try_manual_ack(ack)?;
1602        Ok(())
1603    }
1604
1605    /// Sends a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
1606    /// Only needed if the `manual_acks` flag is set.
1607    ///
1608    /// # Errors
1609    ///
1610    /// Returns an error if the derived acknowledgement cannot be queued on the
1611    /// event loop.
1612    pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
1613        if let Some(ack) = self.prepare_ack(publish) {
1614            self.manual_ack(ack)?;
1615        }
1616        Ok(())
1617    }
1618
1619    /// Attempts to send a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
1620    /// Only needed if the `manual_acks` flag is set.
1621    ///
1622    /// # Errors
1623    ///
1624    /// Returns an error if the derived acknowledgement cannot be queued
1625    /// immediately on the event loop.
1626    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
1627        if let Some(ack) = self.prepare_ack(publish) {
1628            self.try_manual_ack(ack)?;
1629        }
1630        Ok(())
1631    }
1632
1633    /// Sends a MQTT AUTH packet to the `EventLoop`.
1634    ///
1635    /// # Errors
1636    ///
1637    /// Returns an error if the AUTH packet cannot be queued on the event loop.
1638    pub fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
1639        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1640        let auth = Request::Auth(auth);
1641        self.client.send_request(auth)?;
1642        Ok(())
1643    }
1644
1645    /// Attempts to send a MQTT AUTH packet to the `EventLoop`.
1646    ///
1647    /// # Errors
1648    ///
1649    /// Returns an error if the AUTH packet cannot be queued immediately on the
1650    /// event loop.
1651    pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
1652        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1653        let auth = Request::Auth(auth);
1654        self.client.try_send_request(auth)?;
1655        Ok(())
1656    }
1657
1658    /// Sends a MQTT Subscribe to the `EventLoop`
1659    fn handle_subscribe<S: Into<String>>(
1660        &self,
1661        topic: S,
1662        qos: QoS,
1663        properties: Option<SubscribeProperties>,
1664    ) -> Result<(), ClientError> {
1665        let filter = Filter::new(topic, qos);
1666        let subscribe = Subscribe::new(filter, properties);
1667        if !subscribe_has_valid_filters(&subscribe) {
1668            return Err(ClientError::Request(Box::new(subscribe.into())));
1669        }
1670
1671        self.client.send_request(subscribe.into())?;
1672        Ok(())
1673    }
1674
1675    /// Sends a MQTT Subscribe with properties to the `EventLoop`.
1676    ///
1677    /// # Errors
1678    ///
1679    /// Returns an error if the topic filter is invalid or if the request
1680    /// cannot be queued on the event loop.
1681    pub fn subscribe_with_properties<S: Into<String>>(
1682        &self,
1683        topic: S,
1684        qos: QoS,
1685        properties: SubscribeProperties,
1686    ) -> Result<(), ClientError> {
1687        self.handle_subscribe(topic, qos, Some(properties))
1688    }
1689
1690    /// Sends a MQTT Subscribe to the `EventLoop`.
1691    ///
1692    /// # Errors
1693    ///
1694    /// Returns an error if the topic filter is invalid or if the request
1695    /// cannot be queued on the event loop.
1696    pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1697        self.handle_subscribe(topic, qos, None)
1698    }
1699
1700    /// Attempts to send a MQTT Subscribe with properties to the `EventLoop`.
1701    ///
1702    /// # Errors
1703    ///
1704    /// Returns an error if the topic filter is invalid or if the request
1705    /// cannot be queued immediately on the event loop.
1706    pub fn try_subscribe_with_properties<S: Into<String>>(
1707        &self,
1708        topic: S,
1709        qos: QoS,
1710        properties: SubscribeProperties,
1711    ) -> Result<(), ClientError> {
1712        self.client
1713            .try_subscribe_with_properties(topic, qos, properties)
1714    }
1715
1716    /// Attempts to send a MQTT Subscribe to the `EventLoop`.
1717    ///
1718    /// # Errors
1719    ///
1720    /// Returns an error if the topic filter is invalid or if the request
1721    /// cannot be queued immediately on the event loop.
1722    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1723        self.client.try_subscribe(topic, qos)
1724    }
1725
1726    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
1727    fn handle_subscribe_many<T>(
1728        &self,
1729        topics: T,
1730        properties: Option<SubscribeProperties>,
1731    ) -> Result<(), ClientError>
1732    where
1733        T: IntoIterator<Item = Filter>,
1734    {
1735        let subscribe = Subscribe::new_many(topics, properties);
1736        if !subscribe_has_valid_filters(&subscribe) {
1737            return Err(ClientError::Request(Box::new(subscribe.into())));
1738        }
1739
1740        self.client.send_request(subscribe.into())?;
1741        Ok(())
1742    }
1743
1744    /// Sends a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1745    ///
1746    /// # Errors
1747    ///
1748    /// Returns an error if the filter list is invalid or if the request cannot
1749    /// be queued on the event loop.
1750    pub fn subscribe_many_with_properties<T>(
1751        &self,
1752        topics: T,
1753        properties: SubscribeProperties,
1754    ) -> Result<(), ClientError>
1755    where
1756        T: IntoIterator<Item = Filter>,
1757    {
1758        self.handle_subscribe_many(topics, Some(properties))
1759    }
1760
1761    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`.
1762    ///
1763    /// # Errors
1764    ///
1765    /// Returns an error if the filter list is invalid or if the request cannot
1766    /// be queued on the event loop.
1767    pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1768    where
1769        T: IntoIterator<Item = Filter>,
1770    {
1771        self.handle_subscribe_many(topics, None)
1772    }
1773
1774    /// Attempts to send a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1775    ///
1776    /// # Errors
1777    ///
1778    /// Returns an error if the filter list is invalid or if the request cannot
1779    /// be queued immediately on the event loop.
1780    pub fn try_subscribe_many_with_properties<T>(
1781        &self,
1782        topics: T,
1783        properties: SubscribeProperties,
1784    ) -> Result<(), ClientError>
1785    where
1786        T: IntoIterator<Item = Filter>,
1787    {
1788        self.client
1789            .try_subscribe_many_with_properties(topics, properties)
1790    }
1791
1792    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`.
1793    ///
1794    /// # Errors
1795    ///
1796    /// Returns an error if the filter list is invalid or if the request cannot
1797    /// be queued immediately on the event loop.
1798    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1799    where
1800        T: IntoIterator<Item = Filter>,
1801    {
1802        self.client.try_subscribe_many(topics)
1803    }
1804
1805    /// Sends a MQTT Unsubscribe to the `EventLoop`
1806    fn handle_unsubscribe<S: Into<String>>(
1807        &self,
1808        topic: S,
1809        properties: Option<UnsubscribeProperties>,
1810    ) -> Result<(), ClientError> {
1811        let unsubscribe = Unsubscribe::new(topic, properties);
1812        let request = Request::Unsubscribe(unsubscribe);
1813        self.client.send_request(request)?;
1814        Ok(())
1815    }
1816
1817    /// Sends a MQTT Unsubscribe with properties to the `EventLoop`.
1818    ///
1819    /// # Errors
1820    ///
1821    /// Returns an error if the request cannot be queued on the event loop.
1822    pub fn unsubscribe_with_properties<S: Into<String>>(
1823        &self,
1824        topic: S,
1825        properties: UnsubscribeProperties,
1826    ) -> Result<(), ClientError> {
1827        self.handle_unsubscribe(topic, Some(properties))
1828    }
1829
1830    /// Sends a MQTT Unsubscribe to the `EventLoop`.
1831    ///
1832    /// # Errors
1833    ///
1834    /// Returns an error if the request cannot be queued on the event loop.
1835    pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1836        self.handle_unsubscribe(topic, None)
1837    }
1838
1839    /// Attempts to send a MQTT Unsubscribe with properties to the `EventLoop`.
1840    ///
1841    /// # Errors
1842    ///
1843    /// Returns an error if the request cannot be queued immediately on the
1844    /// event loop.
1845    pub fn try_unsubscribe_with_properties<S: Into<String>>(
1846        &self,
1847        topic: S,
1848        properties: UnsubscribeProperties,
1849    ) -> Result<(), ClientError> {
1850        self.client
1851            .try_unsubscribe_with_properties(topic, properties)
1852    }
1853
1854    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`.
1855    ///
1856    /// # Errors
1857    ///
1858    /// Returns an error if the request cannot be queued immediately on the
1859    /// event loop.
1860    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1861        self.client.try_unsubscribe(topic)
1862    }
1863
1864    /// Sends a MQTT disconnect to the `EventLoop`.
1865    ///
1866    /// # Errors
1867    ///
1868    /// Returns an error if the disconnect request cannot be queued on the
1869    /// event loop.
1870    pub fn disconnect(&self) -> Result<(), ClientError> {
1871        self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1872    }
1873
1874    /// Sends a MQTT disconnect to the `EventLoop` with properties.
1875    ///
1876    /// # Errors
1877    ///
1878    /// Returns an error if the disconnect request cannot be queued on the
1879    /// event loop.
1880    pub fn disconnect_with_properties(
1881        &self,
1882        reason: DisconnectReasonCode,
1883        properties: DisconnectProperties,
1884    ) -> Result<(), ClientError> {
1885        self.handle_disconnect(reason, Some(properties))
1886    }
1887
1888    fn handle_disconnect(
1889        &self,
1890        reason: DisconnectReasonCode,
1891        properties: Option<DisconnectProperties>,
1892    ) -> Result<(), ClientError> {
1893        let request = AsyncClient::build_disconnect_request(reason, properties);
1894        self.client.send_request(request)?;
1895        Ok(())
1896    }
1897
1898    /// Attempts to send a MQTT disconnect to the `EventLoop`.
1899    ///
1900    /// # Errors
1901    ///
1902    /// Returns an error if the disconnect request cannot be queued
1903    /// immediately on the event loop.
1904    pub fn try_disconnect(&self) -> Result<(), ClientError> {
1905        self.client.try_disconnect()
1906    }
1907
1908    /// Attempts to send a MQTT disconnect to the `EventLoop` with properties.
1909    ///
1910    /// # Errors
1911    ///
1912    /// Returns an error if the disconnect request cannot be queued
1913    /// immediately on the event loop.
1914    pub fn try_disconnect_with_properties(
1915        &self,
1916        reason: DisconnectReasonCode,
1917        properties: DisconnectProperties,
1918    ) -> Result<(), ClientError> {
1919        self.client.handle_try_disconnect(reason, Some(properties))
1920    }
1921}
1922
1923#[must_use]
1924fn empty_topic_without_valid_alias(topic: &str, properties: Option<&PublishProperties>) -> bool {
1925    topic.is_empty()
1926        && properties
1927            .and_then(|props| props.topic_alias)
1928            .unwrap_or_default()
1929            == 0
1930}
1931
1932#[must_use]
1933fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
1934    !subscribe.filters.is_empty()
1935        && subscribe
1936            .filters
1937            .iter()
1938            .all(|filter| valid_filter(&filter.path))
1939}
1940
1941/// Error type returned by [`Connection::recv`]
1942#[derive(Debug, Eq, PartialEq)]
1943pub struct RecvError;
1944
1945/// Error type returned by [`Connection::try_recv`]
1946#[derive(Debug, Eq, PartialEq)]
1947pub enum TryRecvError {
1948    /// User has closed requests channel
1949    Disconnected,
1950    /// Did not resolve
1951    Empty,
1952}
1953
1954/// Error type returned by [`Connection::recv_timeout`]
1955#[derive(Debug, Eq, PartialEq)]
1956pub enum RecvTimeoutError {
1957    /// User has closed requests channel
1958    Disconnected,
1959    /// Recv request timedout
1960    Timeout,
1961}
1962
1963///  MQTT connection. Maintains all the necessary state
1964pub struct Connection {
1965    pub eventloop: EventLoop,
1966    runtime: Runtime,
1967}
1968impl Connection {
1969    const fn new(eventloop: EventLoop, runtime: Runtime) -> Self {
1970        Self { eventloop, runtime }
1971    }
1972
1973    /// Returns an iterator over this connection. Iterating over this is all that's
1974    /// necessary to make connection progress and maintain a robust connection.
1975    /// Just continuing to loop will reconnect
1976    /// **NOTE** Don't block this while iterating
1977    // ideally this should be named iter_mut because it requires a mutable reference
1978    // Also we can implement IntoIter for this to make it easy to iterate over it
1979    #[must_use = "Connection should be iterated over a loop to make progress"]
1980    pub const fn iter(&mut self) -> Iter<'_> {
1981        Iter { connection: self }
1982    }
1983
1984    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
1985    /// if all clients/users have closed requests channel.
1986    ///
1987    /// [`EventLoop`]: super::EventLoop
1988    ///
1989    /// # Errors
1990    ///
1991    /// Returns [`RecvError`] if all request senders have been dropped.
1992    pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
1993        let f = self.eventloop.poll();
1994        let event = self.runtime.block_on(f);
1995
1996        resolve_event(event).ok_or(RecvError)
1997    }
1998
1999    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
2000    /// if none immediately present or all clients/users have closed requests channel.
2001    ///
2002    /// [`EventLoop`]: super::EventLoop
2003    ///
2004    /// # Errors
2005    ///
2006    /// Returns [`TryRecvError::Empty`] if no event is immediately ready, or
2007    /// [`TryRecvError::Disconnected`] if all request senders have been dropped.
2008    pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
2009        let f = self.eventloop.poll();
2010        // Enters the runtime context so we can poll the future, as required by `now_or_never()`.
2011        // ref: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.enter
2012        let _guard = self.runtime.enter();
2013        let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
2014
2015        resolve_event(event).ok_or(TryRecvError::Disconnected)
2016    }
2017
2018    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
2019    /// if all clients/users have closed requests channel or the timeout has expired.
2020    ///
2021    /// [`EventLoop`]: super::EventLoop
2022    ///
2023    /// # Errors
2024    ///
2025    /// Returns [`RecvTimeoutError::Timeout`] if no event arrives before
2026    /// `duration`, or [`RecvTimeoutError::Disconnected`] if all request
2027    /// senders have been dropped.
2028    pub fn recv_timeout(
2029        &mut self,
2030        duration: Duration,
2031    ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
2032        let f = self.eventloop.poll();
2033        let event = self
2034            .runtime
2035            .block_on(async { timeout(duration, f).await })
2036            .map_err(|_| RecvTimeoutError::Timeout)?;
2037
2038        resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
2039    }
2040}
2041
2042fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
2043    match event {
2044        Ok(v) => Some(Ok(v)),
2045        // closing of request channel should stop the iterator
2046        Err(ConnectionError::RequestsDone) => {
2047            trace!("Done with requests");
2048            None
2049        }
2050        Err(e) => Some(Err(e)),
2051    }
2052}
2053
2054/// Iterator which polls the `EventLoop` for connection progress
2055pub struct Iter<'a> {
2056    connection: &'a mut Connection,
2057}
2058
2059impl Iterator for Iter<'_> {
2060    type Item = Result<Event, ConnectionError>;
2061
2062    fn next(&mut self) -> Option<Self::Item> {
2063        self.connection.recv().ok()
2064    }
2065}
2066
2067#[cfg(test)]
2068mod test {
2069    use crate::mqttbytes::v5::{
2070        LastWill, PubAckProperties, PubAckReason, PubRecProperties, PubRecReason,
2071    };
2072
2073    use super::*;
2074
2075    #[test]
2076    fn calling_iter_twice_on_connection_shouldnt_panic() {
2077        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
2078        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
2079        mqttoptions.set_keep_alive(5).set_last_will(will);
2080
2081        let (_, mut connection) = Client::new(mqttoptions, 10);
2082        let _ = connection.iter();
2083        let _ = connection.iter();
2084    }
2085
2086    #[test]
2087    fn should_be_able_to_build_test_client_from_channel() {
2088        let (tx, rx) = flume::bounded(1);
2089        let client = Client::from_sender(tx);
2090        client
2091            .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
2092            .expect("Should be able to publish");
2093        let _ = rx.try_recv().expect("Should have message");
2094    }
2095
2096    #[test]
2097    fn prepare_ack_maps_qos_to_manual_ack_packets_v5() {
2098        let (tx, _) = flume::bounded(1);
2099        let client = Client::from_sender(tx);
2100
2101        let qos0 = Publish::new("hello/world", QoS::AtMostOnce, vec![1], None);
2102        assert!(client.prepare_ack(&qos0).is_none());
2103
2104        let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
2105        qos1.pkid = 7;
2106        match client.prepare_ack(&qos1) {
2107            Some(ManualAck::PubAck(ack)) => assert_eq!(ack.pkid, 7),
2108            ack => panic!("expected QoS1 PubAck, got {ack:?}"),
2109        }
2110
2111        let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
2112        qos2.pkid = 9;
2113        match client.prepare_ack(&qos2) {
2114            Some(ManualAck::PubRec(ack)) => assert_eq!(ack.pkid, 9),
2115            ack => panic!("expected QoS2 PubRec, got {ack:?}"),
2116        }
2117    }
2118
2119    #[test]
2120    fn manual_ack_sends_custom_puback_reason_and_properties() {
2121        let (tx, rx) = flume::bounded(1);
2122        let client = Client::from_sender(tx);
2123
2124        let expected_properties = PubAckProperties {
2125            reason_string: Some("no downstream subscribers".to_owned()),
2126            user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
2127        };
2128        let mut ack = PubAck::new(41, None);
2129        ack.reason = PubAckReason::NoMatchingSubscribers;
2130        ack.properties = Some(expected_properties.clone());
2131
2132        client
2133            .manual_ack(ManualAck::PubAck(ack))
2134            .expect("manual_ack should send request");
2135
2136        let request = rx.try_recv().expect("Should have ack request");
2137        match request {
2138            Request::PubAck(ack) => {
2139                assert_eq!(ack.pkid, 41);
2140                assert_eq!(ack.reason, PubAckReason::NoMatchingSubscribers);
2141                assert_eq!(ack.properties, Some(expected_properties));
2142            }
2143            request => panic!("Expected PubAck request, got {request:?}"),
2144        }
2145    }
2146
2147    #[test]
2148    fn try_manual_ack_sends_custom_pubrec_reason_and_properties() {
2149        let (tx, rx) = flume::bounded(1);
2150        let client = Client::from_sender(tx);
2151
2152        let expected_properties = PubRecProperties {
2153            reason_string: Some("queued for qos2 flow".to_owned()),
2154            user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
2155        };
2156        let mut ack = PubRec::new(52, None);
2157        ack.reason = PubRecReason::ImplementationSpecificError;
2158        ack.properties = Some(expected_properties.clone());
2159
2160        client
2161            .try_manual_ack(ManualAck::PubRec(ack))
2162            .expect("try_manual_ack should send request");
2163
2164        let request = rx.try_recv().expect("Should have ack request");
2165        match request {
2166            Request::PubRec(ack) => {
2167                assert_eq!(ack.pkid, 52);
2168                assert_eq!(ack.reason, PubRecReason::ImplementationSpecificError);
2169                assert_eq!(ack.properties, Some(expected_properties));
2170            }
2171            request => panic!("Expected PubRec request, got {request:?}"),
2172        }
2173    }
2174
2175    #[test]
2176    fn ack_and_try_ack_send_default_success_packets_v5() {
2177        let (tx, rx) = flume::bounded(2);
2178        let client = Client::from_sender(tx);
2179
2180        let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
2181        qos1.pkid = 11;
2182        client.ack(&qos1).expect("ack should send PubAck");
2183
2184        let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
2185        qos2.pkid = 13;
2186        client
2187            .try_ack(&qos2)
2188            .expect("try_ack should send PubRec request");
2189
2190        let first = rx.try_recv().expect("Should receive first ack request");
2191        match first {
2192            Request::PubAck(ack) => {
2193                assert_eq!(ack.pkid, 11);
2194                assert_eq!(ack.reason, PubAckReason::Success);
2195                assert_eq!(ack.properties, None);
2196            }
2197            request => panic!("Expected PubAck request, got {request:?}"),
2198        }
2199
2200        let second = rx.try_recv().expect("Should receive second ack request");
2201        match second {
2202            Request::PubRec(ack) => {
2203                assert_eq!(ack.pkid, 13);
2204                assert_eq!(ack.reason, PubRecReason::Success);
2205                assert_eq!(ack.properties, None);
2206            }
2207            request => panic!("Expected PubRec request, got {request:?}"),
2208        }
2209    }
2210
2211    #[test]
2212    fn test_reauth() {
2213        let (client, mut connection) = Client::new(MqttOptions::new("test-1", "localhost"), 10);
2214        let props = AuthProperties {
2215            method: Some("test".to_string()),
2216            data: Some(Bytes::from("test")),
2217            reason: None,
2218            user_properties: vec![],
2219        };
2220        client
2221            .reauth(Some(props.clone()))
2222            .expect("Should be able to reauth");
2223        let _ = connection.iter().next().expect("Should have event");
2224
2225        client
2226            .try_reauth(Some(props))
2227            .expect("Should be able to reauth");
2228        let _ = connection.iter().next().expect("Should have event");
2229    }
2230
2231    #[test]
2232    fn can_publish_with_validated_topic() {
2233        let (tx, rx) = flume::bounded(1);
2234        let client = Client::from_sender(tx);
2235        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2236        client
2237            .publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
2238            .expect("Should be able to publish");
2239        let _ = rx.try_recv().expect("Should have message");
2240    }
2241
2242    #[test]
2243    fn publish_accepts_borrowed_string_topic() {
2244        let (tx, rx) = flume::bounded(2);
2245        let client = Client::from_sender(tx);
2246        let topic = "hello/world".to_string();
2247        client
2248            .publish(&topic, QoS::ExactlyOnce, false, "good bye")
2249            .expect("Should be able to publish");
2250        client
2251            .try_publish(&topic, QoS::ExactlyOnce, false, "good bye")
2252            .expect("Should be able to publish");
2253        let _ = rx.try_recv().expect("Should have message");
2254        let _ = rx.try_recv().expect("Should have message");
2255    }
2256
2257    #[test]
2258    fn publish_accepts_cow_topic_variants() {
2259        let (tx, rx) = flume::bounded(2);
2260        let client = Client::from_sender(tx);
2261        client
2262            .publish(
2263                std::borrow::Cow::Borrowed("hello/world"),
2264                QoS::ExactlyOnce,
2265                false,
2266                "good bye",
2267            )
2268            .expect("Should be able to publish");
2269        client
2270            .try_publish(
2271                std::borrow::Cow::Owned("hello/world".to_owned()),
2272                QoS::ExactlyOnce,
2273                false,
2274                "good bye",
2275            )
2276            .expect("Should be able to publish");
2277        let _ = rx.try_recv().expect("Should have message");
2278        let _ = rx.try_recv().expect("Should have message");
2279    }
2280
2281    #[test]
2282    fn publishing_invalid_cow_topic_fails() {
2283        let (tx, _) = flume::bounded(1);
2284        let client = Client::from_sender(tx);
2285        let err = client
2286            .publish(
2287                std::borrow::Cow::Borrowed("a/+/b"),
2288                QoS::ExactlyOnce,
2289                false,
2290                "good bye",
2291            )
2292            .expect_err("Invalid publish topic should fail");
2293        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2294    }
2295
2296    #[test]
2297    fn validated_topic_ergonomics() {
2298        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2299        let valid_topic_can_be_cloned = valid_topic.clone();
2300        // ValidatedTopic can be compared
2301        assert_eq!(valid_topic, valid_topic_can_be_cloned);
2302    }
2303
2304    #[test]
2305    fn creating_invalid_validated_topic_fails() {
2306        assert_eq!(
2307            ValidatedTopic::new("a/+/b"),
2308            Err(InvalidTopic("a/+/b".to_string()))
2309        );
2310    }
2311
2312    #[test]
2313    fn publish_with_properties_accepts_validated_topic() {
2314        let (tx, rx) = flume::bounded(1);
2315        let client = Client::from_sender(tx);
2316        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2317        client
2318            .publish_with_properties(
2319                valid_topic,
2320                QoS::ExactlyOnce,
2321                false,
2322                "good bye",
2323                PublishProperties::default(),
2324            )
2325            .expect("Should be able to publish");
2326        let _ = rx.try_recv().expect("Should have message");
2327    }
2328
2329    #[test]
2330    fn publish_with_properties_empty_topic_requires_nonzero_alias() {
2331        let (tx, _) = flume::bounded(1);
2332        let client = Client::from_sender(tx);
2333
2334        let err = client
2335            .publish_with_properties(
2336                "",
2337                QoS::AtMostOnce,
2338                false,
2339                "good bye",
2340                PublishProperties::default(),
2341            )
2342            .expect_err("Empty topic without topic alias should fail");
2343        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2344
2345        let err = client
2346            .publish_with_properties(
2347                "",
2348                QoS::AtMostOnce,
2349                false,
2350                "good bye",
2351                PublishProperties {
2352                    topic_alias: Some(0),
2353                    ..Default::default()
2354                },
2355            )
2356            .expect_err("Empty topic with topic alias 0 should fail");
2357        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2358    }
2359
2360    #[test]
2361    fn publish_with_properties_empty_topic_accepts_nonzero_alias() {
2362        let (tx, rx) = flume::bounded(1);
2363        let client = Client::from_sender(tx);
2364
2365        client
2366            .publish_with_properties(
2367                "",
2368                QoS::AtMostOnce,
2369                false,
2370                "good bye",
2371                PublishProperties {
2372                    topic_alias: Some(1),
2373                    ..Default::default()
2374                },
2375            )
2376            .expect("Empty topic with non-zero topic alias should be accepted");
2377
2378        let request = rx.try_recv().expect("Should have message");
2379        match request {
2380            Request::Publish(publish) => {
2381                assert!(publish.topic.is_empty());
2382                assert_eq!(
2383                    publish
2384                        .properties
2385                        .as_ref()
2386                        .and_then(|properties| properties.topic_alias),
2387                    Some(1)
2388                );
2389            }
2390            request => panic!("Expected Publish request, got {request:?}"),
2391        }
2392    }
2393
2394    #[test]
2395    fn try_publish_accepts_validated_topic() {
2396        let (tx, rx) = flume::bounded(1);
2397        let client = Client::from_sender(tx);
2398        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2399        client
2400            .try_publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
2401            .expect("Should be able to publish");
2402        let _ = rx.try_recv().expect("Should have message");
2403    }
2404
2405    #[test]
2406    fn try_publish_with_properties_accepts_validated_topic() {
2407        let (tx, rx) = flume::bounded(1);
2408        let client = Client::from_sender(tx);
2409        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2410        client
2411            .try_publish_with_properties(
2412                valid_topic,
2413                QoS::ExactlyOnce,
2414                false,
2415                "good bye",
2416                PublishProperties::default(),
2417            )
2418            .expect("Should be able to publish");
2419        let _ = rx.try_recv().expect("Should have message");
2420    }
2421
2422    #[test]
2423    fn try_publish_with_properties_empty_topic_requires_nonzero_alias() {
2424        let (tx, _) = flume::bounded(1);
2425        let client = Client::from_sender(tx);
2426
2427        let err = client
2428            .try_publish_with_properties(
2429                "",
2430                QoS::AtMostOnce,
2431                false,
2432                "good bye",
2433                PublishProperties::default(),
2434            )
2435            .expect_err("Empty topic without topic alias should fail");
2436        assert!(matches!(err, ClientError::TryRequest(req) if matches!(*req, Request::Publish(_))));
2437    }
2438
2439    #[test]
2440    fn publishing_invalid_raw_topic_fails() {
2441        let (tx, _) = flume::bounded(1);
2442        let client = Client::from_sender(tx);
2443        let err = client
2444            .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2445            .expect_err("Invalid publish topic should fail");
2446        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2447    }
2448
2449    #[test]
2450    fn async_publish_paths_accept_validated_topic() {
2451        let (tx, rx) = flume::bounded(4);
2452        let client = AsyncClient::from_senders(tx);
2453        let runtime = runtime::Builder::new_current_thread()
2454            .enable_all()
2455            .build()
2456            .unwrap();
2457
2458        runtime.block_on(async {
2459            client
2460                .publish(
2461                    ValidatedTopic::new("hello/world").unwrap(),
2462                    QoS::ExactlyOnce,
2463                    false,
2464                    "good bye",
2465                )
2466                .await
2467                .expect("Should be able to publish");
2468
2469            client
2470                .publish_with_properties(
2471                    ValidatedTopic::new("hello/world").unwrap(),
2472                    QoS::ExactlyOnce,
2473                    false,
2474                    "good bye",
2475                    PublishProperties::default(),
2476                )
2477                .await
2478                .expect("Should be able to publish");
2479
2480            client
2481                .publish_bytes(
2482                    ValidatedTopic::new("hello/world").unwrap(),
2483                    QoS::ExactlyOnce,
2484                    false,
2485                    Bytes::from_static(b"good bye"),
2486                )
2487                .await
2488                .expect("Should be able to publish");
2489
2490            client
2491                .publish_bytes_with_properties(
2492                    ValidatedTopic::new("hello/world").unwrap(),
2493                    QoS::ExactlyOnce,
2494                    false,
2495                    Bytes::from_static(b"good bye"),
2496                    PublishProperties::default(),
2497                )
2498                .await
2499                .expect("Should be able to publish");
2500        });
2501
2502        let _ = rx.try_recv().expect("Should have message");
2503        let _ = rx.try_recv().expect("Should have message");
2504        let _ = rx.try_recv().expect("Should have message");
2505        let _ = rx.try_recv().expect("Should have message");
2506    }
2507
2508    #[test]
2509    fn async_try_publish_paths_accept_validated_topic() {
2510        let (tx, rx) = flume::bounded(4);
2511        let client = AsyncClient::from_senders(tx);
2512
2513        client
2514            .try_publish(
2515                ValidatedTopic::new("hello/world").unwrap(),
2516                QoS::ExactlyOnce,
2517                false,
2518                "good bye",
2519            )
2520            .expect("Should be able to publish");
2521
2522        client
2523            .try_publish_with_properties(
2524                ValidatedTopic::new("hello/world").unwrap(),
2525                QoS::ExactlyOnce,
2526                false,
2527                "good bye",
2528                PublishProperties::default(),
2529            )
2530            .expect("Should be able to publish");
2531
2532        let _ = rx.try_recv().expect("Should have message");
2533        let _ = rx.try_recv().expect("Should have message");
2534    }
2535
2536    #[test]
2537    fn async_publishing_invalid_raw_topic_fails() {
2538        let (tx, _) = flume::bounded(1);
2539        let client = AsyncClient::from_senders(tx);
2540        let runtime = runtime::Builder::new_current_thread()
2541            .enable_all()
2542            .build()
2543            .unwrap();
2544
2545        runtime.block_on(async {
2546            let err = client
2547                .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2548                .await
2549                .expect_err("Invalid publish topic should fail");
2550            assert!(
2551                matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
2552            );
2553
2554            let err = client
2555                .publish_bytes(
2556                    "a/+/b",
2557                    QoS::ExactlyOnce,
2558                    false,
2559                    Bytes::from_static(b"good bye"),
2560                )
2561                .await
2562                .expect_err("Invalid publish topic should fail");
2563            assert!(
2564                matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
2565            );
2566
2567            let err = client
2568                .publish_with_properties(
2569                    "",
2570                    QoS::AtMostOnce,
2571                    false,
2572                    "good bye",
2573                    PublishProperties::default(),
2574                )
2575                .await
2576                .expect_err("Empty topic without topic alias should fail");
2577            assert!(
2578                matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
2579            );
2580        });
2581    }
2582
2583    #[test]
2584    fn disconnect_with_properties_builds_disconnect_request() {
2585        let (tx, rx) = flume::bounded(1);
2586        let client = Client::from_sender(tx);
2587        let properties = DisconnectProperties {
2588            session_expiry_interval: Some(120),
2589            reason_string: Some("closing".to_string()),
2590            user_properties: vec![("source".to_string(), "test".to_string())],
2591            server_reference: Some("backup-broker".to_string()),
2592        };
2593
2594        client
2595            .disconnect_with_properties(
2596                DisconnectReasonCode::ImplementationSpecificError,
2597                properties.clone(),
2598            )
2599            .expect("disconnect_with_properties should enqueue request");
2600
2601        let request = rx.try_recv().expect("Should have disconnect request");
2602        match request {
2603            Request::Disconnect(disconnect) => {
2604                assert_eq!(
2605                    disconnect.reason_code,
2606                    DisconnectReasonCode::ImplementationSpecificError
2607                );
2608                assert_eq!(disconnect.properties, Some(properties));
2609            }
2610            request => panic!("Expected disconnect request, got {request:?}"),
2611        }
2612    }
2613
2614    #[test]
2615    fn try_disconnect_with_properties_builds_disconnect_request() {
2616        let (tx, rx) = flume::bounded(1);
2617        let client = Client::from_sender(tx);
2618        let properties = DisconnectProperties {
2619            session_expiry_interval: Some(360),
2620            reason_string: Some("maintenance".to_string()),
2621            user_properties: vec![("env".to_string(), "test".to_string())],
2622            server_reference: None,
2623        };
2624
2625        client
2626            .try_disconnect_with_properties(
2627                DisconnectReasonCode::ServerShuttingDown,
2628                properties.clone(),
2629            )
2630            .expect("try_disconnect_with_properties should enqueue request");
2631
2632        let request = rx.try_recv().expect("Should have disconnect request");
2633        match request {
2634            Request::Disconnect(disconnect) => {
2635                assert_eq!(
2636                    disconnect.reason_code,
2637                    DisconnectReasonCode::ServerShuttingDown
2638                );
2639                assert_eq!(disconnect.properties, Some(properties));
2640            }
2641            request => panic!("Expected disconnect request, got {request:?}"),
2642        }
2643    }
2644
2645    #[test]
2646    fn async_disconnect_with_properties_builds_disconnect_request() {
2647        let (tx, rx) = flume::bounded(1);
2648        let client = AsyncClient::from_senders(tx);
2649        let runtime = runtime::Builder::new_current_thread()
2650            .enable_all()
2651            .build()
2652            .unwrap();
2653        let properties = DisconnectProperties {
2654            session_expiry_interval: Some(42),
2655            reason_string: Some("done".to_string()),
2656            user_properties: vec![("k".to_string(), "v".to_string())],
2657            server_reference: Some("fallback".to_string()),
2658        };
2659
2660        runtime.block_on(async {
2661            client
2662                .disconnect_with_properties(
2663                    DisconnectReasonCode::UseAnotherServer,
2664                    properties.clone(),
2665                )
2666                .await
2667                .expect("disconnect_with_properties should enqueue request");
2668        });
2669
2670        let request = rx.try_recv().expect("Should have disconnect request");
2671        match request {
2672            Request::Disconnect(disconnect) => {
2673                assert_eq!(
2674                    disconnect.reason_code,
2675                    DisconnectReasonCode::UseAnotherServer
2676                );
2677                assert_eq!(disconnect.properties, Some(properties));
2678            }
2679            request => panic!("Expected disconnect request, got {request:?}"),
2680        }
2681    }
2682
2683    #[test]
2684    fn async_try_disconnect_with_properties_builds_disconnect_request() {
2685        let (tx, rx) = flume::bounded(1);
2686        let client = AsyncClient::from_senders(tx);
2687        let properties = DisconnectProperties {
2688            session_expiry_interval: Some(7),
2689            reason_string: Some("bye".to_string()),
2690            user_properties: vec![("actor".to_string(), "test".to_string())],
2691            server_reference: None,
2692        };
2693
2694        client
2695            .try_disconnect_with_properties(
2696                DisconnectReasonCode::AdministrativeAction,
2697                properties.clone(),
2698            )
2699            .expect("try_disconnect_with_properties should enqueue request");
2700
2701        let request = rx.try_recv().expect("Should have disconnect request");
2702        match request {
2703            Request::Disconnect(disconnect) => {
2704                assert_eq!(
2705                    disconnect.reason_code,
2706                    DisconnectReasonCode::AdministrativeAction
2707                );
2708                assert_eq!(disconnect.properties, Some(properties));
2709            }
2710            request => panic!("Expected disconnect request, got {request:?}"),
2711        }
2712    }
2713
2714    #[test]
2715    fn tracked_publish_requires_tracking_channel() {
2716        let (tx, _) = flume::bounded(2);
2717        let client = AsyncClient::from_senders(tx);
2718        let runtime = runtime::Builder::new_current_thread()
2719            .enable_all()
2720            .build()
2721            .unwrap();
2722
2723        runtime.block_on(async {
2724            let err = client
2725                .publish_tracked("hello/world", QoS::AtLeastOnce, false, "good bye")
2726                .await
2727                .expect_err("tracked publish should fail without tracked channel");
2728            assert!(matches!(err, ClientError::TrackingUnavailable));
2729
2730            let err = client
2731                .publish_bytes_tracked(
2732                    "hello/world",
2733                    QoS::AtLeastOnce,
2734                    false,
2735                    Bytes::from_static(b"good bye"),
2736                )
2737                .await
2738                .expect_err("tracked publish bytes should fail without tracked channel");
2739            assert!(matches!(err, ClientError::TrackingUnavailable));
2740
2741            let err = client
2742                .subscribe_tracked("hello/world", QoS::AtLeastOnce)
2743                .await
2744                .expect_err("tracked subscribe should fail without tracked channel");
2745            assert!(matches!(err, ClientError::TrackingUnavailable));
2746
2747            let err = client
2748                .subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
2749                .await
2750                .expect_err("tracked subscribe many should fail without tracked channel");
2751            assert!(matches!(err, ClientError::TrackingUnavailable));
2752
2753            let err = client
2754                .unsubscribe_tracked("hello/world")
2755                .await
2756                .expect_err("tracked unsubscribe should fail without tracked channel");
2757            assert!(matches!(err, ClientError::TrackingUnavailable));
2758        });
2759
2760        let err = client
2761            .try_subscribe_tracked("hello/world", QoS::AtLeastOnce)
2762            .expect_err("tracked try_subscribe should fail without tracked channel");
2763        assert!(matches!(err, ClientError::TrackingUnavailable));
2764
2765        let err = client
2766            .try_subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
2767            .expect_err("tracked try_subscribe_many should fail without tracked channel");
2768        assert!(matches!(err, ClientError::TrackingUnavailable));
2769
2770        let err = client
2771            .try_unsubscribe_tracked("hello/world")
2772            .expect_err("tracked try_unsubscribe should fail without tracked channel");
2773        assert!(matches!(err, ClientError::TrackingUnavailable));
2774    }
2775}