1use std::borrow::Cow;
4use std::time::Duration;
5
6use super::eventloop::RequestEnvelope;
7use super::mqttbytes::QoS;
8use super::mqttbytes::v5::{
9 Auth, AuthProperties, AuthReasonCode, Filter, PubAck, PubRec, Publish, PublishProperties,
10 Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties,
11};
12use super::{
13 ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop,
14 MqttOptions, Request,
15};
16use crate::notice::{PublishNoticeTx, RequestNoticeTx};
17use crate::{PublishNotice, RequestNotice, valid_filter, valid_topic};
18
19use bytes::Bytes;
20use flume::{SendError, Sender, TrySendError};
21use futures_util::FutureExt;
22use tokio::runtime::{self, Runtime};
23use tokio::time::timeout;
24
25#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
27#[error("Invalid MQTT topic: '{0}'")]
28pub struct InvalidTopic(String);
29
30#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct ValidatedTopic(String);
40
41impl ValidatedTopic {
42 pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
48 let topic_string = topic.into();
49 if valid_topic(&topic_string) {
50 Ok(Self(topic_string))
51 } else {
52 Err(InvalidTopic(topic_string))
53 }
54 }
55}
56
57impl From<ValidatedTopic> for String {
58 fn from(topic: ValidatedTopic) -> Self {
59 topic.0
60 }
61}
62
63pub enum PublishTopic {
68 Unvalidated(String),
70 Validated(ValidatedTopic),
72}
73
74impl PublishTopic {
75 fn into_string_and_validation(self) -> (String, bool) {
76 match self {
77 Self::Unvalidated(topic) => (topic, true),
78 Self::Validated(topic) => (topic.0, false),
79 }
80 }
81}
82
83impl From<ValidatedTopic> for PublishTopic {
84 fn from(topic: ValidatedTopic) -> Self {
85 Self::Validated(topic)
86 }
87}
88
89impl From<String> for PublishTopic {
90 fn from(topic: String) -> Self {
91 Self::Unvalidated(topic)
92 }
93}
94
95impl From<&str> for PublishTopic {
96 fn from(topic: &str) -> Self {
97 Self::Unvalidated(topic.to_owned())
98 }
99}
100
101impl From<&String> for PublishTopic {
102 fn from(topic: &String) -> Self {
103 Self::Unvalidated(topic.clone())
104 }
105}
106
107impl From<Cow<'_, str>> for PublishTopic {
108 fn from(topic: Cow<'_, str>) -> Self {
109 Self::Unvalidated(topic.into_owned())
110 }
111}
112
113#[derive(Debug, thiserror::Error)]
115pub enum ClientError {
116 #[error("Failed to send mqtt requests to eventloop")]
117 Request(Box<Request>),
118 #[error("Failed to send mqtt requests to eventloop")]
119 TryRequest(Box<Request>),
120 #[error("Tracked request API is unavailable for this client instance")]
121 TrackingUnavailable,
122}
123
124impl From<SendError<Request>> for ClientError {
125 fn from(e: SendError<Request>) -> Self {
126 Self::Request(Box::new(e.into_inner()))
127 }
128}
129
130impl From<TrySendError<Request>> for ClientError {
131 fn from(e: TrySendError<Request>) -> Self {
132 Self::TryRequest(Box::new(e.into_inner()))
133 }
134}
135
136#[derive(Clone, Debug)]
137enum RequestSender {
138 Plain(Sender<Request>),
139 WithNotice(Sender<RequestEnvelope>),
140}
141
142fn into_request(envelope: RequestEnvelope) -> Request {
143 let (request, _notice) = envelope.into_parts();
144 request
145}
146
147fn map_send_envelope_error(err: SendError<RequestEnvelope>) -> ClientError {
148 ClientError::Request(Box::new(into_request(err.into_inner())))
149}
150
151fn map_try_send_envelope_error(err: TrySendError<RequestEnvelope>) -> ClientError {
152 match err {
153 TrySendError::Full(envelope) | TrySendError::Disconnected(envelope) => {
154 ClientError::TryRequest(Box::new(into_request(envelope)))
155 }
156 }
157}
158
159#[derive(Clone, Debug, PartialEq, Eq)]
161pub enum ManualAck {
162 PubAck(PubAck),
163 PubRec(PubRec),
164}
165
166impl ManualAck {
167 fn into_request(self) -> Request {
168 match self {
169 Self::PubAck(ack) => Request::PubAck(ack),
170 Self::PubRec(rec) => Request::PubRec(rec),
171 }
172 }
173}
174
175#[derive(Clone, Debug)]
183pub struct AsyncClient {
184 request_tx: RequestSender,
185}
186
187impl AsyncClient {
188 pub fn new(options: MqttOptions, cap: usize) -> (Self, EventLoop) {
192 let (eventloop, request_tx) = EventLoop::new_for_async_client(options, cap);
193 let client = Self {
194 request_tx: RequestSender::WithNotice(request_tx),
195 };
196
197 (client, eventloop)
198 }
199
200 #[must_use]
205 pub const fn from_senders(request_tx: Sender<Request>) -> Self {
206 Self {
207 request_tx: RequestSender::Plain(request_tx),
208 }
209 }
210
211 async fn send_request_async(&self, request: Request) -> Result<(), ClientError> {
212 match &self.request_tx {
213 RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
214 RequestSender::WithNotice(tx) => tx
215 .send_async(RequestEnvelope::plain(request))
216 .await
217 .map_err(map_send_envelope_error),
218 }
219 }
220
221 fn try_send_request(&self, request: Request) -> Result<(), ClientError> {
222 match &self.request_tx {
223 RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
224 RequestSender::WithNotice(tx) => tx
225 .try_send(RequestEnvelope::plain(request))
226 .map_err(map_try_send_envelope_error),
227 }
228 }
229
230 fn send_request(&self, request: Request) -> Result<(), ClientError> {
231 match &self.request_tx {
232 RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
233 RequestSender::WithNotice(tx) => tx
234 .send(RequestEnvelope::plain(request))
235 .map_err(map_send_envelope_error),
236 }
237 }
238
239 async fn send_tracked_publish_async(
240 &self,
241 publish: Publish,
242 ) -> Result<PublishNotice, ClientError> {
243 let RequestSender::WithNotice(request_tx) = &self.request_tx else {
244 return Err(ClientError::TrackingUnavailable);
245 };
246
247 let (notice_tx, notice) = PublishNoticeTx::new();
248 request_tx
249 .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
250 .await
251 .map_err(map_send_envelope_error)?;
252 Ok(notice)
253 }
254
255 fn try_send_tracked_publish(&self, publish: Publish) -> Result<PublishNotice, ClientError> {
256 let RequestSender::WithNotice(request_tx) = &self.request_tx else {
257 return Err(ClientError::TrackingUnavailable);
258 };
259
260 let (notice_tx, notice) = PublishNoticeTx::new();
261 request_tx
262 .try_send(RequestEnvelope::tracked_publish(publish, notice_tx))
263 .map_err(map_try_send_envelope_error)?;
264 Ok(notice)
265 }
266
267 async fn send_tracked_subscribe_async(
268 &self,
269 subscribe: Subscribe,
270 ) -> Result<RequestNotice, ClientError> {
271 let RequestSender::WithNotice(request_tx) = &self.request_tx else {
272 return Err(ClientError::TrackingUnavailable);
273 };
274
275 let (notice_tx, notice) = RequestNoticeTx::new();
276 request_tx
277 .send_async(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
278 .await
279 .map_err(map_send_envelope_error)?;
280 Ok(notice)
281 }
282
283 fn try_send_tracked_subscribe(
284 &self,
285 subscribe: Subscribe,
286 ) -> Result<RequestNotice, ClientError> {
287 let RequestSender::WithNotice(request_tx) = &self.request_tx else {
288 return Err(ClientError::TrackingUnavailable);
289 };
290
291 let (notice_tx, notice) = RequestNoticeTx::new();
292 request_tx
293 .try_send(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
294 .map_err(map_try_send_envelope_error)?;
295 Ok(notice)
296 }
297
298 async fn send_tracked_unsubscribe_async(
299 &self,
300 unsubscribe: Unsubscribe,
301 ) -> Result<RequestNotice, ClientError> {
302 let RequestSender::WithNotice(request_tx) = &self.request_tx else {
303 return Err(ClientError::TrackingUnavailable);
304 };
305
306 let (notice_tx, notice) = RequestNoticeTx::new();
307 request_tx
308 .send_async(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
309 .await
310 .map_err(map_send_envelope_error)?;
311 Ok(notice)
312 }
313
314 fn try_send_tracked_unsubscribe(
315 &self,
316 unsubscribe: Unsubscribe,
317 ) -> Result<RequestNotice, ClientError> {
318 let RequestSender::WithNotice(request_tx) = &self.request_tx else {
319 return Err(ClientError::TrackingUnavailable);
320 };
321
322 let (notice_tx, notice) = RequestNoticeTx::new();
323 request_tx
324 .try_send(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
325 .map_err(map_try_send_envelope_error)?;
326 Ok(notice)
327 }
328
329 async fn handle_publish<T, P>(
331 &self,
332 topic: T,
333 qos: QoS,
334 retain: bool,
335 payload: P,
336 properties: Option<PublishProperties>,
337 ) -> Result<(), ClientError>
338 where
339 T: Into<PublishTopic>,
340 P: Into<Bytes>,
341 {
342 let (topic, needs_validation) = topic.into().into_string_and_validation();
343 let invalid_topic = (needs_validation && !valid_topic(&topic))
344 || empty_topic_without_valid_alias(&topic, properties.as_ref());
345 let mut publish = Publish::new(topic, qos, payload, properties);
346 publish.retain = retain;
347 let publish = Request::Publish(publish);
348
349 if invalid_topic {
350 return Err(ClientError::Request(Box::new(publish)));
351 }
352
353 self.send_request_async(publish).await?;
354 Ok(())
355 }
356
357 async fn handle_publish_tracked<T, P>(
358 &self,
359 topic: T,
360 qos: QoS,
361 retain: bool,
362 payload: P,
363 properties: Option<PublishProperties>,
364 ) -> Result<PublishNotice, ClientError>
365 where
366 T: Into<PublishTopic>,
367 P: Into<Bytes>,
368 {
369 let (topic, needs_validation) = topic.into().into_string_and_validation();
370 let invalid_topic = (needs_validation && !valid_topic(&topic))
371 || empty_topic_without_valid_alias(&topic, properties.as_ref());
372 let mut publish = Publish::new(topic, qos, payload, properties);
373 publish.retain = retain;
374 let request = Request::Publish(publish.clone());
375
376 if invalid_topic {
377 return Err(ClientError::Request(Box::new(request)));
378 }
379
380 self.send_tracked_publish_async(publish).await
381 }
382
383 pub async fn publish_with_properties<T, P>(
390 &self,
391 topic: T,
392 qos: QoS,
393 retain: bool,
394 payload: P,
395 properties: PublishProperties,
396 ) -> Result<(), ClientError>
397 where
398 T: Into<PublishTopic>,
399 P: Into<Bytes>,
400 {
401 self.handle_publish(topic, qos, retain, payload, Some(properties))
402 .await
403 }
404
405 pub async fn publish_with_properties_tracked<T, P>(
412 &self,
413 topic: T,
414 qos: QoS,
415 retain: bool,
416 payload: P,
417 properties: PublishProperties,
418 ) -> Result<PublishNotice, ClientError>
419 where
420 T: Into<PublishTopic>,
421 P: Into<Bytes>,
422 {
423 self.handle_publish_tracked(topic, qos, retain, payload, Some(properties))
424 .await
425 }
426
427 pub async fn publish<T, P>(
434 &self,
435 topic: T,
436 qos: QoS,
437 retain: bool,
438 payload: P,
439 ) -> Result<(), ClientError>
440 where
441 T: Into<PublishTopic>,
442 P: Into<Bytes>,
443 {
444 self.handle_publish(topic, qos, retain, payload, None).await
445 }
446
447 pub async fn publish_tracked<T, P>(
454 &self,
455 topic: T,
456 qos: QoS,
457 retain: bool,
458 payload: P,
459 ) -> Result<PublishNotice, ClientError>
460 where
461 T: Into<PublishTopic>,
462 P: Into<Bytes>,
463 {
464 self.handle_publish_tracked(topic, qos, retain, payload, None)
465 .await
466 }
467
468 fn handle_try_publish<T, P>(
470 &self,
471 topic: T,
472 qos: QoS,
473 retain: bool,
474 payload: P,
475 properties: Option<PublishProperties>,
476 ) -> Result<(), ClientError>
477 where
478 T: Into<PublishTopic>,
479 P: Into<Bytes>,
480 {
481 let (topic, needs_validation) = topic.into().into_string_and_validation();
482 let invalid_topic = (needs_validation && !valid_topic(&topic))
483 || empty_topic_without_valid_alias(&topic, properties.as_ref());
484 let mut publish = Publish::new(topic, qos, payload, properties);
485 publish.retain = retain;
486 let publish = Request::Publish(publish);
487
488 if invalid_topic {
489 return Err(ClientError::TryRequest(Box::new(publish)));
490 }
491
492 self.try_send_request(publish)?;
493 Ok(())
494 }
495
496 fn handle_try_publish_tracked<T, P>(
497 &self,
498 topic: T,
499 qos: QoS,
500 retain: bool,
501 payload: P,
502 properties: Option<PublishProperties>,
503 ) -> Result<PublishNotice, ClientError>
504 where
505 T: Into<PublishTopic>,
506 P: Into<Bytes>,
507 {
508 let (topic, needs_validation) = topic.into().into_string_and_validation();
509 let invalid_topic = (needs_validation && !valid_topic(&topic))
510 || empty_topic_without_valid_alias(&topic, properties.as_ref());
511 let mut publish = Publish::new(topic, qos, payload, properties);
512 publish.retain = retain;
513 let request = Request::Publish(publish.clone());
514
515 if invalid_topic {
516 return Err(ClientError::TryRequest(Box::new(request)));
517 }
518
519 self.try_send_tracked_publish(publish)
520 }
521
522 pub fn try_publish_with_properties<T, P>(
529 &self,
530 topic: T,
531 qos: QoS,
532 retain: bool,
533 payload: P,
534 properties: PublishProperties,
535 ) -> Result<(), ClientError>
536 where
537 T: Into<PublishTopic>,
538 P: Into<Bytes>,
539 {
540 self.handle_try_publish(topic, qos, retain, payload, Some(properties))
541 }
542
543 pub fn try_publish_with_properties_tracked<T, P>(
550 &self,
551 topic: T,
552 qos: QoS,
553 retain: bool,
554 payload: P,
555 properties: PublishProperties,
556 ) -> Result<PublishNotice, ClientError>
557 where
558 T: Into<PublishTopic>,
559 P: Into<Bytes>,
560 {
561 self.handle_try_publish_tracked(topic, qos, retain, payload, Some(properties))
562 }
563
564 pub fn try_publish<T, P>(
571 &self,
572 topic: T,
573 qos: QoS,
574 retain: bool,
575 payload: P,
576 ) -> Result<(), ClientError>
577 where
578 T: Into<PublishTopic>,
579 P: Into<Bytes>,
580 {
581 self.handle_try_publish(topic, qos, retain, payload, None)
582 }
583
584 pub fn try_publish_tracked<T, P>(
591 &self,
592 topic: T,
593 qos: QoS,
594 retain: bool,
595 payload: P,
596 ) -> Result<PublishNotice, ClientError>
597 where
598 T: Into<PublishTopic>,
599 P: Into<Bytes>,
600 {
601 self.handle_try_publish_tracked(topic, qos, retain, payload, None)
602 }
603
604 pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
608 prepare_ack(publish)
609 }
610
611 pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
620 self.send_request_async(ack.into_request()).await?;
621 Ok(())
622 }
623
624 pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
631 self.try_send_request(ack.into_request())?;
632 Ok(())
633 }
634
635 pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
643 if let Some(ack) = self.prepare_ack(publish) {
644 self.manual_ack(ack).await?;
645 }
646 Ok(())
647 }
648
649 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
657 if let Some(ack) = self.prepare_ack(publish) {
658 self.try_manual_ack(ack)?;
659 }
660 Ok(())
661 }
662
663 pub async fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
669 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
670 let auth = Request::Auth(auth);
671 self.send_request_async(auth).await?;
672 Ok(())
673 }
674
675 pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
682 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
683 let auth = Request::Auth(auth);
684 self.try_send_request(auth)?;
685 Ok(())
686 }
687
688 async fn handle_publish_bytes<T>(
690 &self,
691 topic: T,
692 qos: QoS,
693 retain: bool,
694 payload: Bytes,
695 properties: Option<PublishProperties>,
696 ) -> Result<(), ClientError>
697 where
698 T: Into<PublishTopic>,
699 {
700 let (topic, needs_validation) = topic.into().into_string_and_validation();
701 let invalid_topic = (needs_validation && !valid_topic(&topic))
702 || empty_topic_without_valid_alias(&topic, properties.as_ref());
703 let mut publish = Publish::new(topic, qos, payload, properties);
704 publish.retain = retain;
705 let publish = Request::Publish(publish);
706
707 if invalid_topic {
708 return Err(ClientError::Request(Box::new(publish)));
709 }
710
711 self.send_request_async(publish).await?;
712 Ok(())
713 }
714
715 async fn handle_publish_bytes_tracked<T>(
716 &self,
717 topic: T,
718 qos: QoS,
719 retain: bool,
720 payload: Bytes,
721 properties: Option<PublishProperties>,
722 ) -> Result<PublishNotice, ClientError>
723 where
724 T: Into<PublishTopic>,
725 {
726 let (topic, needs_validation) = topic.into().into_string_and_validation();
727 let invalid_topic = (needs_validation && !valid_topic(&topic))
728 || empty_topic_without_valid_alias(&topic, properties.as_ref());
729 let mut publish = Publish::new(topic, qos, payload, properties);
730 publish.retain = retain;
731 let request = Request::Publish(publish.clone());
732
733 if invalid_topic {
734 return Err(ClientError::Request(Box::new(request)));
735 }
736
737 self.send_tracked_publish_async(publish).await
738 }
739
740 pub async fn publish_bytes_with_properties<T>(
747 &self,
748 topic: T,
749 qos: QoS,
750 retain: bool,
751 payload: Bytes,
752 properties: PublishProperties,
753 ) -> Result<(), ClientError>
754 where
755 T: Into<PublishTopic>,
756 {
757 self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
758 .await
759 }
760
761 pub async fn publish_bytes_with_properties_tracked<T>(
768 &self,
769 topic: T,
770 qos: QoS,
771 retain: bool,
772 payload: Bytes,
773 properties: PublishProperties,
774 ) -> Result<PublishNotice, ClientError>
775 where
776 T: Into<PublishTopic>,
777 {
778 self.handle_publish_bytes_tracked(topic, qos, retain, payload, Some(properties))
779 .await
780 }
781
782 pub async fn publish_bytes<T>(
789 &self,
790 topic: T,
791 qos: QoS,
792 retain: bool,
793 payload: Bytes,
794 ) -> Result<(), ClientError>
795 where
796 T: Into<PublishTopic>,
797 {
798 self.handle_publish_bytes(topic, qos, retain, payload, None)
799 .await
800 }
801
802 pub async fn publish_bytes_tracked<T>(
809 &self,
810 topic: T,
811 qos: QoS,
812 retain: bool,
813 payload: Bytes,
814 ) -> Result<PublishNotice, ClientError>
815 where
816 T: Into<PublishTopic>,
817 {
818 self.handle_publish_bytes_tracked(topic, qos, retain, payload, None)
819 .await
820 }
821
822 async fn handle_subscribe<S: Into<String>>(
824 &self,
825 topic: S,
826 qos: QoS,
827 properties: Option<SubscribeProperties>,
828 ) -> Result<(), ClientError> {
829 let filter = Filter::new(topic, qos);
830 let subscribe = Subscribe::new(filter, properties);
831 if !subscribe_has_valid_filters(&subscribe) {
832 return Err(ClientError::Request(Box::new(subscribe.into())));
833 }
834
835 self.send_request_async(subscribe.into()).await?;
836 Ok(())
837 }
838
839 async fn handle_subscribe_tracked<S: Into<String>>(
840 &self,
841 topic: S,
842 qos: QoS,
843 properties: Option<SubscribeProperties>,
844 ) -> Result<RequestNotice, ClientError> {
845 let filter = Filter::new(topic, qos);
846 let subscribe = Subscribe::new(filter, properties);
847 if !subscribe_has_valid_filters(&subscribe) {
848 return Err(ClientError::Request(Box::new(subscribe.into())));
849 }
850
851 self.send_tracked_subscribe_async(subscribe).await
852 }
853
854 pub async fn subscribe_with_properties<S: Into<String>>(
861 &self,
862 topic: S,
863 qos: QoS,
864 properties: SubscribeProperties,
865 ) -> Result<(), ClientError> {
866 self.handle_subscribe(topic, qos, Some(properties)).await
867 }
868
869 pub async fn subscribe_with_properties_tracked<S: Into<String>>(
876 &self,
877 topic: S,
878 qos: QoS,
879 properties: SubscribeProperties,
880 ) -> Result<RequestNotice, ClientError> {
881 self.handle_subscribe_tracked(topic, qos, Some(properties))
882 .await
883 }
884
885 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
892 self.handle_subscribe(topic, qos, None).await
893 }
894
895 pub async fn subscribe_tracked<S: Into<String>>(
902 &self,
903 topic: S,
904 qos: QoS,
905 ) -> Result<RequestNotice, ClientError> {
906 self.handle_subscribe_tracked(topic, qos, None).await
907 }
908
909 fn handle_try_subscribe<S: Into<String>>(
911 &self,
912 topic: S,
913 qos: QoS,
914 properties: Option<SubscribeProperties>,
915 ) -> Result<(), ClientError> {
916 let filter = Filter::new(topic, qos);
917 let subscribe = Subscribe::new(filter, properties);
918 if !subscribe_has_valid_filters(&subscribe) {
919 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
920 }
921
922 self.try_send_request(subscribe.into())?;
923 Ok(())
924 }
925
926 fn handle_try_subscribe_tracked<S: Into<String>>(
927 &self,
928 topic: S,
929 qos: QoS,
930 properties: Option<SubscribeProperties>,
931 ) -> Result<RequestNotice, ClientError> {
932 let filter = Filter::new(topic, qos);
933 let subscribe = Subscribe::new(filter, properties);
934 if !subscribe_has_valid_filters(&subscribe) {
935 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
936 }
937
938 self.try_send_tracked_subscribe(subscribe)
939 }
940
941 pub fn try_subscribe_with_properties<S: Into<String>>(
948 &self,
949 topic: S,
950 qos: QoS,
951 properties: SubscribeProperties,
952 ) -> Result<(), ClientError> {
953 self.handle_try_subscribe(topic, qos, Some(properties))
954 }
955
956 pub fn try_subscribe_with_properties_tracked<S: Into<String>>(
963 &self,
964 topic: S,
965 qos: QoS,
966 properties: SubscribeProperties,
967 ) -> Result<RequestNotice, ClientError> {
968 self.handle_try_subscribe_tracked(topic, qos, Some(properties))
969 }
970
971 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
978 self.handle_try_subscribe(topic, qos, None)
979 }
980
981 pub fn try_subscribe_tracked<S: Into<String>>(
988 &self,
989 topic: S,
990 qos: QoS,
991 ) -> Result<RequestNotice, ClientError> {
992 self.handle_try_subscribe_tracked(topic, qos, None)
993 }
994
995 async fn handle_subscribe_many<T>(
997 &self,
998 topics: T,
999 properties: Option<SubscribeProperties>,
1000 ) -> Result<(), ClientError>
1001 where
1002 T: IntoIterator<Item = Filter>,
1003 {
1004 let subscribe = Subscribe::new_many(topics, properties);
1005 if !subscribe_has_valid_filters(&subscribe) {
1006 return Err(ClientError::Request(Box::new(subscribe.into())));
1007 }
1008
1009 self.send_request_async(subscribe.into()).await?;
1010
1011 Ok(())
1012 }
1013
1014 async fn handle_subscribe_many_tracked<T>(
1015 &self,
1016 topics: T,
1017 properties: Option<SubscribeProperties>,
1018 ) -> Result<RequestNotice, ClientError>
1019 where
1020 T: IntoIterator<Item = Filter>,
1021 {
1022 let subscribe = Subscribe::new_many(topics, properties);
1023 if !subscribe_has_valid_filters(&subscribe) {
1024 return Err(ClientError::Request(Box::new(subscribe.into())));
1025 }
1026
1027 self.send_tracked_subscribe_async(subscribe).await
1028 }
1029
1030 pub async fn subscribe_many_with_properties<T>(
1037 &self,
1038 topics: T,
1039 properties: SubscribeProperties,
1040 ) -> Result<(), ClientError>
1041 where
1042 T: IntoIterator<Item = Filter>,
1043 {
1044 self.handle_subscribe_many(topics, Some(properties)).await
1045 }
1046
1047 pub async fn subscribe_many_with_properties_tracked<T>(
1054 &self,
1055 topics: T,
1056 properties: SubscribeProperties,
1057 ) -> Result<RequestNotice, ClientError>
1058 where
1059 T: IntoIterator<Item = Filter>,
1060 {
1061 self.handle_subscribe_many_tracked(topics, Some(properties))
1062 .await
1063 }
1064
1065 pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1072 where
1073 T: IntoIterator<Item = Filter>,
1074 {
1075 self.handle_subscribe_many(topics, None).await
1076 }
1077
1078 pub async fn subscribe_many_tracked<T>(&self, topics: T) -> Result<RequestNotice, ClientError>
1085 where
1086 T: IntoIterator<Item = Filter>,
1087 {
1088 self.handle_subscribe_many_tracked(topics, None).await
1089 }
1090
1091 fn handle_try_subscribe_many<T>(
1093 &self,
1094 topics: T,
1095 properties: Option<SubscribeProperties>,
1096 ) -> Result<(), ClientError>
1097 where
1098 T: IntoIterator<Item = Filter>,
1099 {
1100 let subscribe = Subscribe::new_many(topics, properties);
1101 if !subscribe_has_valid_filters(&subscribe) {
1102 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1103 }
1104
1105 self.try_send_request(subscribe.into())?;
1106 Ok(())
1107 }
1108
1109 fn handle_try_subscribe_many_tracked<T>(
1110 &self,
1111 topics: T,
1112 properties: Option<SubscribeProperties>,
1113 ) -> Result<RequestNotice, ClientError>
1114 where
1115 T: IntoIterator<Item = Filter>,
1116 {
1117 let subscribe = Subscribe::new_many(topics, properties);
1118 if !subscribe_has_valid_filters(&subscribe) {
1119 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1120 }
1121
1122 self.try_send_tracked_subscribe(subscribe)
1123 }
1124
1125 pub fn try_subscribe_many_with_properties<T>(
1132 &self,
1133 topics: T,
1134 properties: SubscribeProperties,
1135 ) -> Result<(), ClientError>
1136 where
1137 T: IntoIterator<Item = Filter>,
1138 {
1139 self.handle_try_subscribe_many(topics, Some(properties))
1140 }
1141
1142 pub fn try_subscribe_many_with_properties_tracked<T>(
1149 &self,
1150 topics: T,
1151 properties: SubscribeProperties,
1152 ) -> Result<RequestNotice, ClientError>
1153 where
1154 T: IntoIterator<Item = Filter>,
1155 {
1156 self.handle_try_subscribe_many_tracked(topics, Some(properties))
1157 }
1158
1159 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1166 where
1167 T: IntoIterator<Item = Filter>,
1168 {
1169 self.handle_try_subscribe_many(topics, None)
1170 }
1171
1172 pub fn try_subscribe_many_tracked<T>(&self, topics: T) -> Result<RequestNotice, ClientError>
1179 where
1180 T: IntoIterator<Item = Filter>,
1181 {
1182 self.handle_try_subscribe_many_tracked(topics, None)
1183 }
1184
1185 async fn handle_unsubscribe<S: Into<String>>(
1187 &self,
1188 topic: S,
1189 properties: Option<UnsubscribeProperties>,
1190 ) -> Result<(), ClientError> {
1191 let unsubscribe = Unsubscribe::new(topic, properties);
1192 let request = Request::Unsubscribe(unsubscribe);
1193 self.send_request_async(request).await?;
1194 Ok(())
1195 }
1196
1197 async fn handle_unsubscribe_tracked<S: Into<String>>(
1198 &self,
1199 topic: S,
1200 properties: Option<UnsubscribeProperties>,
1201 ) -> Result<RequestNotice, ClientError> {
1202 let unsubscribe = Unsubscribe::new(topic, properties);
1203 self.send_tracked_unsubscribe_async(unsubscribe).await
1204 }
1205
1206 pub async fn unsubscribe_with_properties<S: Into<String>>(
1212 &self,
1213 topic: S,
1214 properties: UnsubscribeProperties,
1215 ) -> Result<(), ClientError> {
1216 self.handle_unsubscribe(topic, Some(properties)).await
1217 }
1218
1219 pub async fn unsubscribe_with_properties_tracked<S: Into<String>>(
1225 &self,
1226 topic: S,
1227 properties: UnsubscribeProperties,
1228 ) -> Result<RequestNotice, ClientError> {
1229 self.handle_unsubscribe_tracked(topic, Some(properties))
1230 .await
1231 }
1232
1233 pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1239 self.handle_unsubscribe(topic, None).await
1240 }
1241
1242 pub async fn unsubscribe_tracked<S: Into<String>>(
1248 &self,
1249 topic: S,
1250 ) -> Result<RequestNotice, ClientError> {
1251 self.handle_unsubscribe_tracked(topic, None).await
1252 }
1253
1254 fn handle_try_unsubscribe<S: Into<String>>(
1256 &self,
1257 topic: S,
1258 properties: Option<UnsubscribeProperties>,
1259 ) -> Result<(), ClientError> {
1260 let unsubscribe = Unsubscribe::new(topic, properties);
1261 let request = Request::Unsubscribe(unsubscribe);
1262 self.try_send_request(request)?;
1263 Ok(())
1264 }
1265
1266 fn handle_try_unsubscribe_tracked<S: Into<String>>(
1267 &self,
1268 topic: S,
1269 properties: Option<UnsubscribeProperties>,
1270 ) -> Result<RequestNotice, ClientError> {
1271 let unsubscribe = Unsubscribe::new(topic, properties);
1272 self.try_send_tracked_unsubscribe(unsubscribe)
1273 }
1274
1275 pub fn try_unsubscribe_with_properties<S: Into<String>>(
1282 &self,
1283 topic: S,
1284 properties: UnsubscribeProperties,
1285 ) -> Result<(), ClientError> {
1286 self.handle_try_unsubscribe(topic, Some(properties))
1287 }
1288
1289 pub fn try_unsubscribe_with_properties_tracked<S: Into<String>>(
1296 &self,
1297 topic: S,
1298 properties: UnsubscribeProperties,
1299 ) -> Result<RequestNotice, ClientError> {
1300 self.handle_try_unsubscribe_tracked(topic, Some(properties))
1301 }
1302
1303 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1310 self.handle_try_unsubscribe(topic, None)
1311 }
1312
1313 pub fn try_unsubscribe_tracked<S: Into<String>>(
1320 &self,
1321 topic: S,
1322 ) -> Result<RequestNotice, ClientError> {
1323 self.handle_try_unsubscribe_tracked(topic, None)
1324 }
1325
1326 pub async fn disconnect(&self) -> Result<(), ClientError> {
1333 self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1334 .await
1335 }
1336
1337 pub async fn disconnect_with_properties(
1344 &self,
1345 reason: DisconnectReasonCode,
1346 properties: DisconnectProperties,
1347 ) -> Result<(), ClientError> {
1348 self.handle_disconnect(reason, Some(properties)).await
1349 }
1350
1351 async fn handle_disconnect(
1353 &self,
1354 reason: DisconnectReasonCode,
1355 properties: Option<DisconnectProperties>,
1356 ) -> Result<(), ClientError> {
1357 let request = Self::build_disconnect_request(reason, properties);
1358 self.send_request_async(request).await?;
1359 Ok(())
1360 }
1361
1362 pub fn try_disconnect(&self) -> Result<(), ClientError> {
1369 self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1370 }
1371
1372 pub fn try_disconnect_with_properties(
1379 &self,
1380 reason: DisconnectReasonCode,
1381 properties: DisconnectProperties,
1382 ) -> Result<(), ClientError> {
1383 self.handle_try_disconnect(reason, Some(properties))
1384 }
1385
1386 fn handle_try_disconnect(
1388 &self,
1389 reason: DisconnectReasonCode,
1390 properties: Option<DisconnectProperties>,
1391 ) -> Result<(), ClientError> {
1392 let request = Self::build_disconnect_request(reason, properties);
1393 self.try_send_request(request)?;
1394 Ok(())
1395 }
1396
1397 fn build_disconnect_request(
1399 reason: DisconnectReasonCode,
1400 properties: Option<DisconnectProperties>,
1401 ) -> Request {
1402 properties.map_or_else(
1403 || Request::Disconnect(Disconnect::new(reason)),
1404 |p| Request::Disconnect(Disconnect::new_with_properties(reason, p)),
1405 )
1406 }
1407}
1408
1409const fn prepare_ack(publish: &Publish) -> Option<ManualAck> {
1410 let ack = match publish.qos {
1411 QoS::AtMostOnce => return None,
1412 QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)),
1413 QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)),
1414 };
1415 Some(ack)
1416}
1417
1418#[derive(Clone)]
1429pub struct Client {
1430 client: AsyncClient,
1431}
1432
1433impl Client {
1434 pub fn new(options: MqttOptions, cap: usize) -> (Self, Connection) {
1442 let (client, eventloop) = AsyncClient::new(options, cap);
1443 let client = Self { client };
1444
1445 let runtime = runtime::Builder::new_current_thread()
1446 .enable_all()
1447 .build()
1448 .unwrap();
1449
1450 let connection = Connection::new(eventloop, runtime);
1451 (client, connection)
1452 }
1453
1454 #[must_use]
1459 pub const fn from_sender(request_tx: Sender<Request>) -> Self {
1460 Self {
1461 client: AsyncClient::from_senders(request_tx),
1462 }
1463 }
1464
1465 fn handle_publish<T, P>(
1467 &self,
1468 topic: T,
1469 qos: QoS,
1470 retain: bool,
1471 payload: P,
1472 properties: Option<PublishProperties>,
1473 ) -> Result<(), ClientError>
1474 where
1475 T: Into<PublishTopic>,
1476 P: Into<Bytes>,
1477 {
1478 let (topic, needs_validation) = topic.into().into_string_and_validation();
1479 let invalid_topic = (needs_validation && !valid_topic(&topic))
1480 || empty_topic_without_valid_alias(&topic, properties.as_ref());
1481 let mut publish = Publish::new(topic, qos, payload, properties);
1482 publish.retain = retain;
1483 let request = Request::Publish(publish);
1484
1485 if invalid_topic {
1486 return Err(ClientError::Request(Box::new(request)));
1487 }
1488
1489 self.client.send_request(request)?;
1490 Ok(())
1491 }
1492
1493 pub fn publish_with_properties<T, P>(
1500 &self,
1501 topic: T,
1502 qos: QoS,
1503 retain: bool,
1504 payload: P,
1505 properties: PublishProperties,
1506 ) -> Result<(), ClientError>
1507 where
1508 T: Into<PublishTopic>,
1509 P: Into<Bytes>,
1510 {
1511 self.handle_publish(topic, qos, retain, payload, Some(properties))
1512 }
1513
1514 pub fn publish<T, P>(
1521 &self,
1522 topic: T,
1523 qos: QoS,
1524 retain: bool,
1525 payload: P,
1526 ) -> Result<(), ClientError>
1527 where
1528 T: Into<PublishTopic>,
1529 P: Into<Bytes>,
1530 {
1531 self.handle_publish(topic, qos, retain, payload, None)
1532 }
1533
1534 pub fn try_publish_with_properties<T, P>(
1541 &self,
1542 topic: T,
1543 qos: QoS,
1544 retain: bool,
1545 payload: P,
1546 properties: PublishProperties,
1547 ) -> Result<(), ClientError>
1548 where
1549 T: Into<PublishTopic>,
1550 P: Into<Bytes>,
1551 {
1552 self.client
1553 .try_publish_with_properties(topic, qos, retain, payload, properties)
1554 }
1555
1556 pub fn try_publish<T, P>(
1563 &self,
1564 topic: T,
1565 qos: QoS,
1566 retain: bool,
1567 payload: P,
1568 ) -> Result<(), ClientError>
1569 where
1570 T: Into<PublishTopic>,
1571 P: Into<Bytes>,
1572 {
1573 self.client.try_publish(topic, qos, retain, payload)
1574 }
1575
1576 pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
1580 self.client.prepare_ack(publish)
1581 }
1582
1583 pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1590 self.client.send_request(ack.into_request())?;
1591 Ok(())
1592 }
1593
1594 pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1601 self.client.try_manual_ack(ack)?;
1602 Ok(())
1603 }
1604
1605 pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
1613 if let Some(ack) = self.prepare_ack(publish) {
1614 self.manual_ack(ack)?;
1615 }
1616 Ok(())
1617 }
1618
1619 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
1627 if let Some(ack) = self.prepare_ack(publish) {
1628 self.try_manual_ack(ack)?;
1629 }
1630 Ok(())
1631 }
1632
1633 pub fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
1639 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1640 let auth = Request::Auth(auth);
1641 self.client.send_request(auth)?;
1642 Ok(())
1643 }
1644
1645 pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
1652 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1653 let auth = Request::Auth(auth);
1654 self.client.try_send_request(auth)?;
1655 Ok(())
1656 }
1657
1658 fn handle_subscribe<S: Into<String>>(
1660 &self,
1661 topic: S,
1662 qos: QoS,
1663 properties: Option<SubscribeProperties>,
1664 ) -> Result<(), ClientError> {
1665 let filter = Filter::new(topic, qos);
1666 let subscribe = Subscribe::new(filter, properties);
1667 if !subscribe_has_valid_filters(&subscribe) {
1668 return Err(ClientError::Request(Box::new(subscribe.into())));
1669 }
1670
1671 self.client.send_request(subscribe.into())?;
1672 Ok(())
1673 }
1674
1675 pub fn subscribe_with_properties<S: Into<String>>(
1682 &self,
1683 topic: S,
1684 qos: QoS,
1685 properties: SubscribeProperties,
1686 ) -> Result<(), ClientError> {
1687 self.handle_subscribe(topic, qos, Some(properties))
1688 }
1689
1690 pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1697 self.handle_subscribe(topic, qos, None)
1698 }
1699
1700 pub fn try_subscribe_with_properties<S: Into<String>>(
1707 &self,
1708 topic: S,
1709 qos: QoS,
1710 properties: SubscribeProperties,
1711 ) -> Result<(), ClientError> {
1712 self.client
1713 .try_subscribe_with_properties(topic, qos, properties)
1714 }
1715
1716 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1723 self.client.try_subscribe(topic, qos)
1724 }
1725
1726 fn handle_subscribe_many<T>(
1728 &self,
1729 topics: T,
1730 properties: Option<SubscribeProperties>,
1731 ) -> Result<(), ClientError>
1732 where
1733 T: IntoIterator<Item = Filter>,
1734 {
1735 let subscribe = Subscribe::new_many(topics, properties);
1736 if !subscribe_has_valid_filters(&subscribe) {
1737 return Err(ClientError::Request(Box::new(subscribe.into())));
1738 }
1739
1740 self.client.send_request(subscribe.into())?;
1741 Ok(())
1742 }
1743
1744 pub fn subscribe_many_with_properties<T>(
1751 &self,
1752 topics: T,
1753 properties: SubscribeProperties,
1754 ) -> Result<(), ClientError>
1755 where
1756 T: IntoIterator<Item = Filter>,
1757 {
1758 self.handle_subscribe_many(topics, Some(properties))
1759 }
1760
1761 pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1768 where
1769 T: IntoIterator<Item = Filter>,
1770 {
1771 self.handle_subscribe_many(topics, None)
1772 }
1773
1774 pub fn try_subscribe_many_with_properties<T>(
1781 &self,
1782 topics: T,
1783 properties: SubscribeProperties,
1784 ) -> Result<(), ClientError>
1785 where
1786 T: IntoIterator<Item = Filter>,
1787 {
1788 self.client
1789 .try_subscribe_many_with_properties(topics, properties)
1790 }
1791
1792 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1799 where
1800 T: IntoIterator<Item = Filter>,
1801 {
1802 self.client.try_subscribe_many(topics)
1803 }
1804
1805 fn handle_unsubscribe<S: Into<String>>(
1807 &self,
1808 topic: S,
1809 properties: Option<UnsubscribeProperties>,
1810 ) -> Result<(), ClientError> {
1811 let unsubscribe = Unsubscribe::new(topic, properties);
1812 let request = Request::Unsubscribe(unsubscribe);
1813 self.client.send_request(request)?;
1814 Ok(())
1815 }
1816
1817 pub fn unsubscribe_with_properties<S: Into<String>>(
1823 &self,
1824 topic: S,
1825 properties: UnsubscribeProperties,
1826 ) -> Result<(), ClientError> {
1827 self.handle_unsubscribe(topic, Some(properties))
1828 }
1829
1830 pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1836 self.handle_unsubscribe(topic, None)
1837 }
1838
1839 pub fn try_unsubscribe_with_properties<S: Into<String>>(
1846 &self,
1847 topic: S,
1848 properties: UnsubscribeProperties,
1849 ) -> Result<(), ClientError> {
1850 self.client
1851 .try_unsubscribe_with_properties(topic, properties)
1852 }
1853
1854 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1861 self.client.try_unsubscribe(topic)
1862 }
1863
1864 pub fn disconnect(&self) -> Result<(), ClientError> {
1871 self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1872 }
1873
1874 pub fn disconnect_with_properties(
1881 &self,
1882 reason: DisconnectReasonCode,
1883 properties: DisconnectProperties,
1884 ) -> Result<(), ClientError> {
1885 self.handle_disconnect(reason, Some(properties))
1886 }
1887
1888 fn handle_disconnect(
1889 &self,
1890 reason: DisconnectReasonCode,
1891 properties: Option<DisconnectProperties>,
1892 ) -> Result<(), ClientError> {
1893 let request = AsyncClient::build_disconnect_request(reason, properties);
1894 self.client.send_request(request)?;
1895 Ok(())
1896 }
1897
1898 pub fn try_disconnect(&self) -> Result<(), ClientError> {
1905 self.client.try_disconnect()
1906 }
1907
1908 pub fn try_disconnect_with_properties(
1915 &self,
1916 reason: DisconnectReasonCode,
1917 properties: DisconnectProperties,
1918 ) -> Result<(), ClientError> {
1919 self.client.handle_try_disconnect(reason, Some(properties))
1920 }
1921}
1922
1923#[must_use]
1924fn empty_topic_without_valid_alias(topic: &str, properties: Option<&PublishProperties>) -> bool {
1925 topic.is_empty()
1926 && properties
1927 .and_then(|props| props.topic_alias)
1928 .unwrap_or_default()
1929 == 0
1930}
1931
1932#[must_use]
1933fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
1934 !subscribe.filters.is_empty()
1935 && subscribe
1936 .filters
1937 .iter()
1938 .all(|filter| valid_filter(&filter.path))
1939}
1940
1941#[derive(Debug, Eq, PartialEq)]
1943pub struct RecvError;
1944
1945#[derive(Debug, Eq, PartialEq)]
1947pub enum TryRecvError {
1948 Disconnected,
1950 Empty,
1952}
1953
1954#[derive(Debug, Eq, PartialEq)]
1956pub enum RecvTimeoutError {
1957 Disconnected,
1959 Timeout,
1961}
1962
1963pub struct Connection {
1965 pub eventloop: EventLoop,
1966 runtime: Runtime,
1967}
1968impl Connection {
1969 const fn new(eventloop: EventLoop, runtime: Runtime) -> Self {
1970 Self { eventloop, runtime }
1971 }
1972
1973 #[must_use = "Connection should be iterated over a loop to make progress"]
1980 pub const fn iter(&mut self) -> Iter<'_> {
1981 Iter { connection: self }
1982 }
1983
1984 pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
1993 let f = self.eventloop.poll();
1994 let event = self.runtime.block_on(f);
1995
1996 resolve_event(event).ok_or(RecvError)
1997 }
1998
1999 pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
2009 let f = self.eventloop.poll();
2010 let _guard = self.runtime.enter();
2013 let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
2014
2015 resolve_event(event).ok_or(TryRecvError::Disconnected)
2016 }
2017
2018 pub fn recv_timeout(
2029 &mut self,
2030 duration: Duration,
2031 ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
2032 let f = self.eventloop.poll();
2033 let event = self
2034 .runtime
2035 .block_on(async { timeout(duration, f).await })
2036 .map_err(|_| RecvTimeoutError::Timeout)?;
2037
2038 resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
2039 }
2040}
2041
2042fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
2043 match event {
2044 Ok(v) => Some(Ok(v)),
2045 Err(ConnectionError::RequestsDone) => {
2047 trace!("Done with requests");
2048 None
2049 }
2050 Err(e) => Some(Err(e)),
2051 }
2052}
2053
2054pub struct Iter<'a> {
2056 connection: &'a mut Connection,
2057}
2058
2059impl Iterator for Iter<'_> {
2060 type Item = Result<Event, ConnectionError>;
2061
2062 fn next(&mut self) -> Option<Self::Item> {
2063 self.connection.recv().ok()
2064 }
2065}
2066
2067#[cfg(test)]
2068mod test {
2069 use crate::mqttbytes::v5::{
2070 LastWill, PubAckProperties, PubAckReason, PubRecProperties, PubRecReason,
2071 };
2072
2073 use super::*;
2074
2075 #[test]
2076 fn calling_iter_twice_on_connection_shouldnt_panic() {
2077 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
2078 let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
2079 mqttoptions.set_keep_alive(5).set_last_will(will);
2080
2081 let (_, mut connection) = Client::new(mqttoptions, 10);
2082 let _ = connection.iter();
2083 let _ = connection.iter();
2084 }
2085
2086 #[test]
2087 fn should_be_able_to_build_test_client_from_channel() {
2088 let (tx, rx) = flume::bounded(1);
2089 let client = Client::from_sender(tx);
2090 client
2091 .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
2092 .expect("Should be able to publish");
2093 let _ = rx.try_recv().expect("Should have message");
2094 }
2095
2096 #[test]
2097 fn prepare_ack_maps_qos_to_manual_ack_packets_v5() {
2098 let (tx, _) = flume::bounded(1);
2099 let client = Client::from_sender(tx);
2100
2101 let qos0 = Publish::new("hello/world", QoS::AtMostOnce, vec![1], None);
2102 assert!(client.prepare_ack(&qos0).is_none());
2103
2104 let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
2105 qos1.pkid = 7;
2106 match client.prepare_ack(&qos1) {
2107 Some(ManualAck::PubAck(ack)) => assert_eq!(ack.pkid, 7),
2108 ack => panic!("expected QoS1 PubAck, got {ack:?}"),
2109 }
2110
2111 let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
2112 qos2.pkid = 9;
2113 match client.prepare_ack(&qos2) {
2114 Some(ManualAck::PubRec(ack)) => assert_eq!(ack.pkid, 9),
2115 ack => panic!("expected QoS2 PubRec, got {ack:?}"),
2116 }
2117 }
2118
2119 #[test]
2120 fn manual_ack_sends_custom_puback_reason_and_properties() {
2121 let (tx, rx) = flume::bounded(1);
2122 let client = Client::from_sender(tx);
2123
2124 let expected_properties = PubAckProperties {
2125 reason_string: Some("no downstream subscribers".to_owned()),
2126 user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
2127 };
2128 let mut ack = PubAck::new(41, None);
2129 ack.reason = PubAckReason::NoMatchingSubscribers;
2130 ack.properties = Some(expected_properties.clone());
2131
2132 client
2133 .manual_ack(ManualAck::PubAck(ack))
2134 .expect("manual_ack should send request");
2135
2136 let request = rx.try_recv().expect("Should have ack request");
2137 match request {
2138 Request::PubAck(ack) => {
2139 assert_eq!(ack.pkid, 41);
2140 assert_eq!(ack.reason, PubAckReason::NoMatchingSubscribers);
2141 assert_eq!(ack.properties, Some(expected_properties));
2142 }
2143 request => panic!("Expected PubAck request, got {request:?}"),
2144 }
2145 }
2146
2147 #[test]
2148 fn try_manual_ack_sends_custom_pubrec_reason_and_properties() {
2149 let (tx, rx) = flume::bounded(1);
2150 let client = Client::from_sender(tx);
2151
2152 let expected_properties = PubRecProperties {
2153 reason_string: Some("queued for qos2 flow".to_owned()),
2154 user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
2155 };
2156 let mut ack = PubRec::new(52, None);
2157 ack.reason = PubRecReason::ImplementationSpecificError;
2158 ack.properties = Some(expected_properties.clone());
2159
2160 client
2161 .try_manual_ack(ManualAck::PubRec(ack))
2162 .expect("try_manual_ack should send request");
2163
2164 let request = rx.try_recv().expect("Should have ack request");
2165 match request {
2166 Request::PubRec(ack) => {
2167 assert_eq!(ack.pkid, 52);
2168 assert_eq!(ack.reason, PubRecReason::ImplementationSpecificError);
2169 assert_eq!(ack.properties, Some(expected_properties));
2170 }
2171 request => panic!("Expected PubRec request, got {request:?}"),
2172 }
2173 }
2174
2175 #[test]
2176 fn ack_and_try_ack_send_default_success_packets_v5() {
2177 let (tx, rx) = flume::bounded(2);
2178 let client = Client::from_sender(tx);
2179
2180 let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
2181 qos1.pkid = 11;
2182 client.ack(&qos1).expect("ack should send PubAck");
2183
2184 let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
2185 qos2.pkid = 13;
2186 client
2187 .try_ack(&qos2)
2188 .expect("try_ack should send PubRec request");
2189
2190 let first = rx.try_recv().expect("Should receive first ack request");
2191 match first {
2192 Request::PubAck(ack) => {
2193 assert_eq!(ack.pkid, 11);
2194 assert_eq!(ack.reason, PubAckReason::Success);
2195 assert_eq!(ack.properties, None);
2196 }
2197 request => panic!("Expected PubAck request, got {request:?}"),
2198 }
2199
2200 let second = rx.try_recv().expect("Should receive second ack request");
2201 match second {
2202 Request::PubRec(ack) => {
2203 assert_eq!(ack.pkid, 13);
2204 assert_eq!(ack.reason, PubRecReason::Success);
2205 assert_eq!(ack.properties, None);
2206 }
2207 request => panic!("Expected PubRec request, got {request:?}"),
2208 }
2209 }
2210
2211 #[test]
2212 fn test_reauth() {
2213 let (client, mut connection) = Client::new(MqttOptions::new("test-1", "localhost"), 10);
2214 let props = AuthProperties {
2215 method: Some("test".to_string()),
2216 data: Some(Bytes::from("test")),
2217 reason: None,
2218 user_properties: vec![],
2219 };
2220 client
2221 .reauth(Some(props.clone()))
2222 .expect("Should be able to reauth");
2223 let _ = connection.iter().next().expect("Should have event");
2224
2225 client
2226 .try_reauth(Some(props))
2227 .expect("Should be able to reauth");
2228 let _ = connection.iter().next().expect("Should have event");
2229 }
2230
2231 #[test]
2232 fn can_publish_with_validated_topic() {
2233 let (tx, rx) = flume::bounded(1);
2234 let client = Client::from_sender(tx);
2235 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2236 client
2237 .publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
2238 .expect("Should be able to publish");
2239 let _ = rx.try_recv().expect("Should have message");
2240 }
2241
2242 #[test]
2243 fn publish_accepts_borrowed_string_topic() {
2244 let (tx, rx) = flume::bounded(2);
2245 let client = Client::from_sender(tx);
2246 let topic = "hello/world".to_string();
2247 client
2248 .publish(&topic, QoS::ExactlyOnce, false, "good bye")
2249 .expect("Should be able to publish");
2250 client
2251 .try_publish(&topic, QoS::ExactlyOnce, false, "good bye")
2252 .expect("Should be able to publish");
2253 let _ = rx.try_recv().expect("Should have message");
2254 let _ = rx.try_recv().expect("Should have message");
2255 }
2256
2257 #[test]
2258 fn publish_accepts_cow_topic_variants() {
2259 let (tx, rx) = flume::bounded(2);
2260 let client = Client::from_sender(tx);
2261 client
2262 .publish(
2263 std::borrow::Cow::Borrowed("hello/world"),
2264 QoS::ExactlyOnce,
2265 false,
2266 "good bye",
2267 )
2268 .expect("Should be able to publish");
2269 client
2270 .try_publish(
2271 std::borrow::Cow::Owned("hello/world".to_owned()),
2272 QoS::ExactlyOnce,
2273 false,
2274 "good bye",
2275 )
2276 .expect("Should be able to publish");
2277 let _ = rx.try_recv().expect("Should have message");
2278 let _ = rx.try_recv().expect("Should have message");
2279 }
2280
2281 #[test]
2282 fn publishing_invalid_cow_topic_fails() {
2283 let (tx, _) = flume::bounded(1);
2284 let client = Client::from_sender(tx);
2285 let err = client
2286 .publish(
2287 std::borrow::Cow::Borrowed("a/+/b"),
2288 QoS::ExactlyOnce,
2289 false,
2290 "good bye",
2291 )
2292 .expect_err("Invalid publish topic should fail");
2293 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2294 }
2295
2296 #[test]
2297 fn validated_topic_ergonomics() {
2298 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2299 let valid_topic_can_be_cloned = valid_topic.clone();
2300 assert_eq!(valid_topic, valid_topic_can_be_cloned);
2302 }
2303
2304 #[test]
2305 fn creating_invalid_validated_topic_fails() {
2306 assert_eq!(
2307 ValidatedTopic::new("a/+/b"),
2308 Err(InvalidTopic("a/+/b".to_string()))
2309 );
2310 }
2311
2312 #[test]
2313 fn publish_with_properties_accepts_validated_topic() {
2314 let (tx, rx) = flume::bounded(1);
2315 let client = Client::from_sender(tx);
2316 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2317 client
2318 .publish_with_properties(
2319 valid_topic,
2320 QoS::ExactlyOnce,
2321 false,
2322 "good bye",
2323 PublishProperties::default(),
2324 )
2325 .expect("Should be able to publish");
2326 let _ = rx.try_recv().expect("Should have message");
2327 }
2328
2329 #[test]
2330 fn publish_with_properties_empty_topic_requires_nonzero_alias() {
2331 let (tx, _) = flume::bounded(1);
2332 let client = Client::from_sender(tx);
2333
2334 let err = client
2335 .publish_with_properties(
2336 "",
2337 QoS::AtMostOnce,
2338 false,
2339 "good bye",
2340 PublishProperties::default(),
2341 )
2342 .expect_err("Empty topic without topic alias should fail");
2343 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2344
2345 let err = client
2346 .publish_with_properties(
2347 "",
2348 QoS::AtMostOnce,
2349 false,
2350 "good bye",
2351 PublishProperties {
2352 topic_alias: Some(0),
2353 ..Default::default()
2354 },
2355 )
2356 .expect_err("Empty topic with topic alias 0 should fail");
2357 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2358 }
2359
2360 #[test]
2361 fn publish_with_properties_empty_topic_accepts_nonzero_alias() {
2362 let (tx, rx) = flume::bounded(1);
2363 let client = Client::from_sender(tx);
2364
2365 client
2366 .publish_with_properties(
2367 "",
2368 QoS::AtMostOnce,
2369 false,
2370 "good bye",
2371 PublishProperties {
2372 topic_alias: Some(1),
2373 ..Default::default()
2374 },
2375 )
2376 .expect("Empty topic with non-zero topic alias should be accepted");
2377
2378 let request = rx.try_recv().expect("Should have message");
2379 match request {
2380 Request::Publish(publish) => {
2381 assert!(publish.topic.is_empty());
2382 assert_eq!(
2383 publish
2384 .properties
2385 .as_ref()
2386 .and_then(|properties| properties.topic_alias),
2387 Some(1)
2388 );
2389 }
2390 request => panic!("Expected Publish request, got {request:?}"),
2391 }
2392 }
2393
2394 #[test]
2395 fn try_publish_accepts_validated_topic() {
2396 let (tx, rx) = flume::bounded(1);
2397 let client = Client::from_sender(tx);
2398 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2399 client
2400 .try_publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
2401 .expect("Should be able to publish");
2402 let _ = rx.try_recv().expect("Should have message");
2403 }
2404
2405 #[test]
2406 fn try_publish_with_properties_accepts_validated_topic() {
2407 let (tx, rx) = flume::bounded(1);
2408 let client = Client::from_sender(tx);
2409 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2410 client
2411 .try_publish_with_properties(
2412 valid_topic,
2413 QoS::ExactlyOnce,
2414 false,
2415 "good bye",
2416 PublishProperties::default(),
2417 )
2418 .expect("Should be able to publish");
2419 let _ = rx.try_recv().expect("Should have message");
2420 }
2421
2422 #[test]
2423 fn try_publish_with_properties_empty_topic_requires_nonzero_alias() {
2424 let (tx, _) = flume::bounded(1);
2425 let client = Client::from_sender(tx);
2426
2427 let err = client
2428 .try_publish_with_properties(
2429 "",
2430 QoS::AtMostOnce,
2431 false,
2432 "good bye",
2433 PublishProperties::default(),
2434 )
2435 .expect_err("Empty topic without topic alias should fail");
2436 assert!(matches!(err, ClientError::TryRequest(req) if matches!(*req, Request::Publish(_))));
2437 }
2438
2439 #[test]
2440 fn publishing_invalid_raw_topic_fails() {
2441 let (tx, _) = flume::bounded(1);
2442 let client = Client::from_sender(tx);
2443 let err = client
2444 .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2445 .expect_err("Invalid publish topic should fail");
2446 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
2447 }
2448
2449 #[test]
2450 fn async_publish_paths_accept_validated_topic() {
2451 let (tx, rx) = flume::bounded(4);
2452 let client = AsyncClient::from_senders(tx);
2453 let runtime = runtime::Builder::new_current_thread()
2454 .enable_all()
2455 .build()
2456 .unwrap();
2457
2458 runtime.block_on(async {
2459 client
2460 .publish(
2461 ValidatedTopic::new("hello/world").unwrap(),
2462 QoS::ExactlyOnce,
2463 false,
2464 "good bye",
2465 )
2466 .await
2467 .expect("Should be able to publish");
2468
2469 client
2470 .publish_with_properties(
2471 ValidatedTopic::new("hello/world").unwrap(),
2472 QoS::ExactlyOnce,
2473 false,
2474 "good bye",
2475 PublishProperties::default(),
2476 )
2477 .await
2478 .expect("Should be able to publish");
2479
2480 client
2481 .publish_bytes(
2482 ValidatedTopic::new("hello/world").unwrap(),
2483 QoS::ExactlyOnce,
2484 false,
2485 Bytes::from_static(b"good bye"),
2486 )
2487 .await
2488 .expect("Should be able to publish");
2489
2490 client
2491 .publish_bytes_with_properties(
2492 ValidatedTopic::new("hello/world").unwrap(),
2493 QoS::ExactlyOnce,
2494 false,
2495 Bytes::from_static(b"good bye"),
2496 PublishProperties::default(),
2497 )
2498 .await
2499 .expect("Should be able to publish");
2500 });
2501
2502 let _ = rx.try_recv().expect("Should have message");
2503 let _ = rx.try_recv().expect("Should have message");
2504 let _ = rx.try_recv().expect("Should have message");
2505 let _ = rx.try_recv().expect("Should have message");
2506 }
2507
2508 #[test]
2509 fn async_try_publish_paths_accept_validated_topic() {
2510 let (tx, rx) = flume::bounded(4);
2511 let client = AsyncClient::from_senders(tx);
2512
2513 client
2514 .try_publish(
2515 ValidatedTopic::new("hello/world").unwrap(),
2516 QoS::ExactlyOnce,
2517 false,
2518 "good bye",
2519 )
2520 .expect("Should be able to publish");
2521
2522 client
2523 .try_publish_with_properties(
2524 ValidatedTopic::new("hello/world").unwrap(),
2525 QoS::ExactlyOnce,
2526 false,
2527 "good bye",
2528 PublishProperties::default(),
2529 )
2530 .expect("Should be able to publish");
2531
2532 let _ = rx.try_recv().expect("Should have message");
2533 let _ = rx.try_recv().expect("Should have message");
2534 }
2535
2536 #[test]
2537 fn async_publishing_invalid_raw_topic_fails() {
2538 let (tx, _) = flume::bounded(1);
2539 let client = AsyncClient::from_senders(tx);
2540 let runtime = runtime::Builder::new_current_thread()
2541 .enable_all()
2542 .build()
2543 .unwrap();
2544
2545 runtime.block_on(async {
2546 let err = client
2547 .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2548 .await
2549 .expect_err("Invalid publish topic should fail");
2550 assert!(
2551 matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
2552 );
2553
2554 let err = client
2555 .publish_bytes(
2556 "a/+/b",
2557 QoS::ExactlyOnce,
2558 false,
2559 Bytes::from_static(b"good bye"),
2560 )
2561 .await
2562 .expect_err("Invalid publish topic should fail");
2563 assert!(
2564 matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
2565 );
2566
2567 let err = client
2568 .publish_with_properties(
2569 "",
2570 QoS::AtMostOnce,
2571 false,
2572 "good bye",
2573 PublishProperties::default(),
2574 )
2575 .await
2576 .expect_err("Empty topic without topic alias should fail");
2577 assert!(
2578 matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
2579 );
2580 });
2581 }
2582
2583 #[test]
2584 fn disconnect_with_properties_builds_disconnect_request() {
2585 let (tx, rx) = flume::bounded(1);
2586 let client = Client::from_sender(tx);
2587 let properties = DisconnectProperties {
2588 session_expiry_interval: Some(120),
2589 reason_string: Some("closing".to_string()),
2590 user_properties: vec![("source".to_string(), "test".to_string())],
2591 server_reference: Some("backup-broker".to_string()),
2592 };
2593
2594 client
2595 .disconnect_with_properties(
2596 DisconnectReasonCode::ImplementationSpecificError,
2597 properties.clone(),
2598 )
2599 .expect("disconnect_with_properties should enqueue request");
2600
2601 let request = rx.try_recv().expect("Should have disconnect request");
2602 match request {
2603 Request::Disconnect(disconnect) => {
2604 assert_eq!(
2605 disconnect.reason_code,
2606 DisconnectReasonCode::ImplementationSpecificError
2607 );
2608 assert_eq!(disconnect.properties, Some(properties));
2609 }
2610 request => panic!("Expected disconnect request, got {request:?}"),
2611 }
2612 }
2613
2614 #[test]
2615 fn try_disconnect_with_properties_builds_disconnect_request() {
2616 let (tx, rx) = flume::bounded(1);
2617 let client = Client::from_sender(tx);
2618 let properties = DisconnectProperties {
2619 session_expiry_interval: Some(360),
2620 reason_string: Some("maintenance".to_string()),
2621 user_properties: vec![("env".to_string(), "test".to_string())],
2622 server_reference: None,
2623 };
2624
2625 client
2626 .try_disconnect_with_properties(
2627 DisconnectReasonCode::ServerShuttingDown,
2628 properties.clone(),
2629 )
2630 .expect("try_disconnect_with_properties should enqueue request");
2631
2632 let request = rx.try_recv().expect("Should have disconnect request");
2633 match request {
2634 Request::Disconnect(disconnect) => {
2635 assert_eq!(
2636 disconnect.reason_code,
2637 DisconnectReasonCode::ServerShuttingDown
2638 );
2639 assert_eq!(disconnect.properties, Some(properties));
2640 }
2641 request => panic!("Expected disconnect request, got {request:?}"),
2642 }
2643 }
2644
2645 #[test]
2646 fn async_disconnect_with_properties_builds_disconnect_request() {
2647 let (tx, rx) = flume::bounded(1);
2648 let client = AsyncClient::from_senders(tx);
2649 let runtime = runtime::Builder::new_current_thread()
2650 .enable_all()
2651 .build()
2652 .unwrap();
2653 let properties = DisconnectProperties {
2654 session_expiry_interval: Some(42),
2655 reason_string: Some("done".to_string()),
2656 user_properties: vec![("k".to_string(), "v".to_string())],
2657 server_reference: Some("fallback".to_string()),
2658 };
2659
2660 runtime.block_on(async {
2661 client
2662 .disconnect_with_properties(
2663 DisconnectReasonCode::UseAnotherServer,
2664 properties.clone(),
2665 )
2666 .await
2667 .expect("disconnect_with_properties should enqueue request");
2668 });
2669
2670 let request = rx.try_recv().expect("Should have disconnect request");
2671 match request {
2672 Request::Disconnect(disconnect) => {
2673 assert_eq!(
2674 disconnect.reason_code,
2675 DisconnectReasonCode::UseAnotherServer
2676 );
2677 assert_eq!(disconnect.properties, Some(properties));
2678 }
2679 request => panic!("Expected disconnect request, got {request:?}"),
2680 }
2681 }
2682
2683 #[test]
2684 fn async_try_disconnect_with_properties_builds_disconnect_request() {
2685 let (tx, rx) = flume::bounded(1);
2686 let client = AsyncClient::from_senders(tx);
2687 let properties = DisconnectProperties {
2688 session_expiry_interval: Some(7),
2689 reason_string: Some("bye".to_string()),
2690 user_properties: vec![("actor".to_string(), "test".to_string())],
2691 server_reference: None,
2692 };
2693
2694 client
2695 .try_disconnect_with_properties(
2696 DisconnectReasonCode::AdministrativeAction,
2697 properties.clone(),
2698 )
2699 .expect("try_disconnect_with_properties should enqueue request");
2700
2701 let request = rx.try_recv().expect("Should have disconnect request");
2702 match request {
2703 Request::Disconnect(disconnect) => {
2704 assert_eq!(
2705 disconnect.reason_code,
2706 DisconnectReasonCode::AdministrativeAction
2707 );
2708 assert_eq!(disconnect.properties, Some(properties));
2709 }
2710 request => panic!("Expected disconnect request, got {request:?}"),
2711 }
2712 }
2713
2714 #[test]
2715 fn tracked_publish_requires_tracking_channel() {
2716 let (tx, _) = flume::bounded(2);
2717 let client = AsyncClient::from_senders(tx);
2718 let runtime = runtime::Builder::new_current_thread()
2719 .enable_all()
2720 .build()
2721 .unwrap();
2722
2723 runtime.block_on(async {
2724 let err = client
2725 .publish_tracked("hello/world", QoS::AtLeastOnce, false, "good bye")
2726 .await
2727 .expect_err("tracked publish should fail without tracked channel");
2728 assert!(matches!(err, ClientError::TrackingUnavailable));
2729
2730 let err = client
2731 .publish_bytes_tracked(
2732 "hello/world",
2733 QoS::AtLeastOnce,
2734 false,
2735 Bytes::from_static(b"good bye"),
2736 )
2737 .await
2738 .expect_err("tracked publish bytes should fail without tracked channel");
2739 assert!(matches!(err, ClientError::TrackingUnavailable));
2740
2741 let err = client
2742 .subscribe_tracked("hello/world", QoS::AtLeastOnce)
2743 .await
2744 .expect_err("tracked subscribe should fail without tracked channel");
2745 assert!(matches!(err, ClientError::TrackingUnavailable));
2746
2747 let err = client
2748 .subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
2749 .await
2750 .expect_err("tracked subscribe many should fail without tracked channel");
2751 assert!(matches!(err, ClientError::TrackingUnavailable));
2752
2753 let err = client
2754 .unsubscribe_tracked("hello/world")
2755 .await
2756 .expect_err("tracked unsubscribe should fail without tracked channel");
2757 assert!(matches!(err, ClientError::TrackingUnavailable));
2758 });
2759
2760 let err = client
2761 .try_subscribe_tracked("hello/world", QoS::AtLeastOnce)
2762 .expect_err("tracked try_subscribe should fail without tracked channel");
2763 assert!(matches!(err, ClientError::TrackingUnavailable));
2764
2765 let err = client
2766 .try_subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
2767 .expect_err("tracked try_subscribe_many should fail without tracked channel");
2768 assert!(matches!(err, ClientError::TrackingUnavailable));
2769
2770 let err = client
2771 .try_unsubscribe_tracked("hello/world")
2772 .expect_err("tracked try_unsubscribe should fail without tracked channel");
2773 assert!(matches!(err, ClientError::TrackingUnavailable));
2774 }
2775}