1use std::borrow::Cow;
4use std::time::Duration;
5
6use crate::eventloop::{RequestChannelCapacity, RequestEnvelope};
7use crate::mqttbytes::{
8 QoS,
9 v4::{Disconnect, PubAck, PubRec, Publish, Subscribe, SubscribeFilter, Unsubscribe},
10};
11use crate::notice::{PublishNoticeTx, SubscribeNoticeTx, UnsubscribeNoticeTx};
12use crate::{
13 ConnectionError, Event, EventLoop, MqttOptions, PublishNotice, Request, SubscribeNotice,
14 UnsubscribeNotice, valid_filter, valid_topic,
15};
16
17use bytes::Bytes;
18use flume::{SendError, Sender, TrySendError};
19use futures_util::FutureExt;
20use tokio::runtime::{self, Runtime};
21use tokio::time::timeout;
22
23#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
25#[error("Invalid MQTT topic: '{0}'")]
26pub struct InvalidTopic(String);
27
28#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct ValidatedTopic(String);
35
36impl ValidatedTopic {
37 pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
43 let topic_string = topic.into();
44 if valid_publish_topic(&topic_string) {
45 Ok(Self(topic_string))
46 } else {
47 Err(InvalidTopic(topic_string))
48 }
49 }
50}
51
52impl From<ValidatedTopic> for String {
53 fn from(topic: ValidatedTopic) -> Self {
54 topic.0
55 }
56}
57
58pub enum PublishTopic {
63 Unvalidated(String),
65 Validated(ValidatedTopic),
67}
68
69impl PublishTopic {
70 fn into_string_and_validation(self) -> (String, bool) {
71 match self {
72 Self::Unvalidated(topic) => (topic, true),
73 Self::Validated(topic) => (topic.0, false),
74 }
75 }
76}
77
78impl From<ValidatedTopic> for PublishTopic {
79 fn from(topic: ValidatedTopic) -> Self {
80 Self::Validated(topic)
81 }
82}
83
84impl From<String> for PublishTopic {
85 fn from(topic: String) -> Self {
86 Self::Unvalidated(topic)
87 }
88}
89
90impl From<&str> for PublishTopic {
91 fn from(topic: &str) -> Self {
92 Self::Unvalidated(topic.to_owned())
93 }
94}
95
96impl From<&String> for PublishTopic {
97 fn from(topic: &String) -> Self {
98 Self::Unvalidated(topic.clone())
99 }
100}
101
102impl From<Cow<'_, str>> for PublishTopic {
103 fn from(topic: Cow<'_, str>) -> Self {
104 Self::Unvalidated(topic.into_owned())
105 }
106}
107
108#[derive(Debug, Eq, PartialEq, thiserror::Error)]
110pub enum ClientError {
111 #[error("Failed to send mqtt requests to eventloop")]
112 Request(Request),
113 #[error("Failed to send mqtt requests to eventloop")]
114 TryRequest(Request),
115 #[error("Tracked request API is unavailable for this client instance")]
116 TrackingUnavailable,
117}
118
119impl From<SendError<Request>> for ClientError {
120 fn from(e: SendError<Request>) -> Self {
121 Self::Request(e.into_inner())
122 }
123}
124
125impl From<TrySendError<Request>> for ClientError {
126 fn from(e: TrySendError<Request>) -> Self {
127 Self::TryRequest(e.into_inner())
128 }
129}
130
131#[derive(Clone, Debug)]
132enum RequestSender {
133 Plain(Sender<Request>),
134 WithNotice {
135 requests: Sender<RequestEnvelope>,
136 control_requests: Sender<RequestEnvelope>,
137 immediate_disconnect: Sender<RequestEnvelope>,
138 },
139}
140
141fn into_request(envelope: RequestEnvelope) -> Request {
142 let (request, _notice) = envelope.into_parts();
143 request
144}
145
146fn map_send_envelope_error(err: SendError<RequestEnvelope>) -> ClientError {
147 ClientError::Request(into_request(err.into_inner()))
148}
149
150fn map_try_send_envelope_error(err: TrySendError<RequestEnvelope>) -> ClientError {
151 match err {
152 TrySendError::Full(envelope) | TrySendError::Disconnected(envelope) => {
153 ClientError::TryRequest(into_request(envelope))
154 }
155 }
156}
157
158const fn is_publish_request(request: &Request) -> bool {
159 matches!(request, Request::Publish(_))
160}
161
162#[derive(Clone, Debug, PartialEq, Eq)]
164pub enum ManualAck {
165 PubAck(PubAck),
166 PubRec(PubRec),
167}
168
169impl ManualAck {
170 const fn into_request(self) -> Request {
171 match self {
172 Self::PubAck(ack) => Request::PubAck(ack),
173 Self::PubRec(rec) => Request::PubRec(rec),
174 }
175 }
176}
177
178#[derive(Clone, Debug)]
201pub struct AsyncClient {
202 request_tx: RequestSender,
203}
204
205#[derive(Debug)]
211pub struct ClientBuilder {
212 options: MqttOptions,
213 capacity: RequestChannelCapacity,
214}
215
216#[derive(Debug)]
222pub struct AsyncClientBuilder {
223 options: MqttOptions,
224 capacity: RequestChannelCapacity,
225}
226
227#[must_use]
228fn build_async_client(
229 options: MqttOptions,
230 capacity: RequestChannelCapacity,
231) -> (AsyncClient, EventLoop) {
232 let (eventloop, request_tx, control_request_tx, immediate_disconnect_tx) =
233 EventLoop::new_for_async_client_with_capacity(options, capacity);
234 let client = AsyncClient {
235 request_tx: RequestSender::WithNotice {
236 requests: request_tx,
237 control_requests: control_request_tx,
238 immediate_disconnect: immediate_disconnect_tx,
239 },
240 };
241
242 (client, eventloop)
243}
244
245impl ClientBuilder {
246 #[must_use]
248 pub const fn new(options: MqttOptions) -> Self {
249 let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
250 Self { options, capacity }
251 }
252
253 #[must_use]
258 pub const fn capacity(mut self, cap: usize) -> Self {
259 self.capacity = RequestChannelCapacity::Bounded(cap);
260 self
261 }
262
263 #[must_use]
265 pub const fn unbounded(mut self) -> Self {
266 self.capacity = RequestChannelCapacity::Unbounded;
267 self
268 }
269
270 #[must_use]
279 pub fn build(self) -> (Client, Connection) {
280 let (client, eventloop) = build_async_client(self.options, self.capacity);
281 let client = Client { client };
282 let runtime = runtime::Builder::new_current_thread()
283 .enable_all()
284 .build()
285 .unwrap();
286
287 let connection = Connection::new(eventloop, runtime);
288 (client, connection)
289 }
290}
291
292impl AsyncClientBuilder {
293 #[must_use]
295 pub const fn new(options: MqttOptions) -> Self {
296 let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
297 Self { options, capacity }
298 }
299
300 #[must_use]
305 pub const fn capacity(mut self, cap: usize) -> Self {
306 self.capacity = RequestChannelCapacity::Bounded(cap);
307 self
308 }
309
310 #[must_use]
312 pub const fn unbounded(mut self) -> Self {
313 self.capacity = RequestChannelCapacity::Unbounded;
314 self
315 }
316
317 #[must_use]
322 pub fn build(self) -> (AsyncClient, EventLoop) {
323 build_async_client(self.options, self.capacity)
324 }
325}
326
327impl AsyncClient {
328 #[must_use]
334 pub const fn builder(options: MqttOptions) -> AsyncClientBuilder {
335 AsyncClientBuilder::new(options)
336 }
337
338 #[must_use]
343 pub const fn from_senders(request_tx: Sender<Request>) -> Self {
344 Self {
345 request_tx: RequestSender::Plain(request_tx),
346 }
347 }
348
349 async fn send_request_async(&self, request: Request) -> Result<(), ClientError> {
350 match &self.request_tx {
351 RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
352 RequestSender::WithNotice {
353 requests,
354 control_requests,
355 ..
356 } => {
357 let tx = if is_publish_request(&request) {
358 requests
359 } else {
360 control_requests
361 };
362 tx.send_async(RequestEnvelope::plain(request))
363 .await
364 .map_err(map_send_envelope_error)
365 }
366 }
367 }
368
369 fn try_send_request(&self, request: Request) -> Result<(), ClientError> {
370 match &self.request_tx {
371 RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
372 RequestSender::WithNotice {
373 requests,
374 control_requests,
375 ..
376 } => {
377 let tx = if is_publish_request(&request) {
378 requests
379 } else {
380 control_requests
381 };
382 tx.try_send(RequestEnvelope::plain(request))
383 .map_err(map_try_send_envelope_error)
384 }
385 }
386 }
387
388 fn send_request(&self, request: Request) -> Result<(), ClientError> {
389 match &self.request_tx {
390 RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
391 RequestSender::WithNotice {
392 requests,
393 control_requests,
394 ..
395 } => {
396 let tx = if is_publish_request(&request) {
397 requests
398 } else {
399 control_requests
400 };
401 tx.send(RequestEnvelope::plain(request))
402 .map_err(map_send_envelope_error)
403 }
404 }
405 }
406
407 async fn send_immediate_disconnect_async(&self, request: Request) -> Result<(), ClientError> {
408 match &self.request_tx {
409 RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
410 RequestSender::WithNotice {
411 immediate_disconnect,
412 ..
413 } => immediate_disconnect
414 .send_async(RequestEnvelope::plain(request))
415 .await
416 .map_err(map_send_envelope_error),
417 }
418 }
419
420 fn try_send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
421 match &self.request_tx {
422 RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
423 RequestSender::WithNotice {
424 immediate_disconnect,
425 ..
426 } => immediate_disconnect
427 .try_send(RequestEnvelope::plain(request))
428 .map_err(map_try_send_envelope_error),
429 }
430 }
431
432 fn send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
433 match &self.request_tx {
434 RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
435 RequestSender::WithNotice {
436 immediate_disconnect,
437 ..
438 } => immediate_disconnect
439 .send(RequestEnvelope::plain(request))
440 .map_err(map_send_envelope_error),
441 }
442 }
443
444 async fn send_tracked_publish_async(
445 &self,
446 publish: Publish,
447 ) -> Result<PublishNotice, ClientError> {
448 let RequestSender::WithNotice {
449 requests: request_tx,
450 ..
451 } = &self.request_tx
452 else {
453 return Err(ClientError::TrackingUnavailable);
454 };
455
456 let (notice_tx, notice) = PublishNoticeTx::new();
457 request_tx
458 .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
459 .await
460 .map_err(map_send_envelope_error)?;
461 Ok(notice)
462 }
463
464 fn try_send_tracked_publish(&self, publish: Publish) -> Result<PublishNotice, ClientError> {
465 let RequestSender::WithNotice {
466 requests: request_tx,
467 ..
468 } = &self.request_tx
469 else {
470 return Err(ClientError::TrackingUnavailable);
471 };
472
473 let (notice_tx, notice) = PublishNoticeTx::new();
474 request_tx
475 .try_send(RequestEnvelope::tracked_publish(publish, notice_tx))
476 .map_err(map_try_send_envelope_error)?;
477 Ok(notice)
478 }
479
480 async fn send_tracked_subscribe_async(
481 &self,
482 subscribe: Subscribe,
483 ) -> Result<SubscribeNotice, ClientError> {
484 let RequestSender::WithNotice {
485 control_requests: request_tx,
486 ..
487 } = &self.request_tx
488 else {
489 return Err(ClientError::TrackingUnavailable);
490 };
491
492 let (notice_tx, notice) = SubscribeNoticeTx::new();
493 request_tx
494 .send_async(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
495 .await
496 .map_err(map_send_envelope_error)?;
497 Ok(notice)
498 }
499
500 fn try_send_tracked_subscribe(
501 &self,
502 subscribe: Subscribe,
503 ) -> Result<SubscribeNotice, ClientError> {
504 let RequestSender::WithNotice {
505 control_requests: request_tx,
506 ..
507 } = &self.request_tx
508 else {
509 return Err(ClientError::TrackingUnavailable);
510 };
511
512 let (notice_tx, notice) = SubscribeNoticeTx::new();
513 request_tx
514 .try_send(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
515 .map_err(map_try_send_envelope_error)?;
516 Ok(notice)
517 }
518
519 async fn send_tracked_unsubscribe_async(
520 &self,
521 unsubscribe: Unsubscribe,
522 ) -> Result<UnsubscribeNotice, ClientError> {
523 let RequestSender::WithNotice {
524 control_requests: request_tx,
525 ..
526 } = &self.request_tx
527 else {
528 return Err(ClientError::TrackingUnavailable);
529 };
530
531 let (notice_tx, notice) = UnsubscribeNoticeTx::new();
532 request_tx
533 .send_async(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
534 .await
535 .map_err(map_send_envelope_error)?;
536 Ok(notice)
537 }
538
539 fn try_send_tracked_unsubscribe(
540 &self,
541 unsubscribe: Unsubscribe,
542 ) -> Result<UnsubscribeNotice, ClientError> {
543 let RequestSender::WithNotice {
544 control_requests: request_tx,
545 ..
546 } = &self.request_tx
547 else {
548 return Err(ClientError::TrackingUnavailable);
549 };
550
551 let (notice_tx, notice) = UnsubscribeNoticeTx::new();
552 request_tx
553 .try_send(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
554 .map_err(map_try_send_envelope_error)?;
555 Ok(notice)
556 }
557
558 async fn handle_publish<T, V>(
560 &self,
561 topic: T,
562 qos: QoS,
563 retain: bool,
564 payload: V,
565 ) -> Result<(), ClientError>
566 where
567 T: Into<PublishTopic>,
568 V: Into<Vec<u8>>,
569 {
570 let (topic, needs_validation) = topic.into().into_string_and_validation();
571 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
572 let mut publish = Publish::new(topic, qos, payload);
573 publish.retain = retain;
574 let publish = Request::Publish(publish);
575
576 if invalid_topic {
577 return Err(ClientError::Request(publish));
578 }
579
580 self.send_request_async(publish).await?;
581 Ok(())
582 }
583
584 async fn handle_publish_tracked<T, V>(
585 &self,
586 topic: T,
587 qos: QoS,
588 retain: bool,
589 payload: V,
590 ) -> Result<PublishNotice, ClientError>
591 where
592 T: Into<PublishTopic>,
593 V: Into<Vec<u8>>,
594 {
595 let (topic, needs_validation) = topic.into().into_string_and_validation();
596 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
597 let mut publish = Publish::new(topic, qos, payload);
598 publish.retain = retain;
599 let request = Request::Publish(publish.clone());
600
601 if invalid_topic {
602 return Err(ClientError::Request(request));
603 }
604
605 self.send_tracked_publish_async(publish).await
606 }
607
608 pub async fn publish<T, V>(
615 &self,
616 topic: T,
617 qos: QoS,
618 retain: bool,
619 payload: V,
620 ) -> Result<(), ClientError>
621 where
622 T: Into<PublishTopic>,
623 V: Into<Vec<u8>>,
624 {
625 self.handle_publish(topic, qos, retain, payload).await
626 }
627
628 pub async fn publish_tracked<T, V>(
635 &self,
636 topic: T,
637 qos: QoS,
638 retain: bool,
639 payload: V,
640 ) -> Result<PublishNotice, ClientError>
641 where
642 T: Into<PublishTopic>,
643 V: Into<Vec<u8>>,
644 {
645 self.handle_publish_tracked(topic, qos, retain, payload)
646 .await
647 }
648
649 fn handle_try_publish<T, V>(
651 &self,
652 topic: T,
653 qos: QoS,
654 retain: bool,
655 payload: V,
656 ) -> Result<(), ClientError>
657 where
658 T: Into<PublishTopic>,
659 V: Into<Vec<u8>>,
660 {
661 let (topic, needs_validation) = topic.into().into_string_and_validation();
662 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
663 let mut publish = Publish::new(topic, qos, payload);
664 publish.retain = retain;
665 let publish = Request::Publish(publish);
666
667 if invalid_topic {
668 return Err(ClientError::TryRequest(publish));
669 }
670
671 self.try_send_request(publish)?;
672 Ok(())
673 }
674
675 fn handle_try_publish_tracked<T, V>(
676 &self,
677 topic: T,
678 qos: QoS,
679 retain: bool,
680 payload: V,
681 ) -> Result<PublishNotice, ClientError>
682 where
683 T: Into<PublishTopic>,
684 V: Into<Vec<u8>>,
685 {
686 let (topic, needs_validation) = topic.into().into_string_and_validation();
687 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
688 let mut publish = Publish::new(topic, qos, payload);
689 publish.retain = retain;
690 let request = Request::Publish(publish.clone());
691
692 if invalid_topic {
693 return Err(ClientError::TryRequest(request));
694 }
695
696 self.try_send_tracked_publish(publish)
697 }
698
699 pub fn try_publish<T, V>(
710 &self,
711 topic: T,
712 qos: QoS,
713 retain: bool,
714 payload: V,
715 ) -> Result<(), ClientError>
716 where
717 T: Into<PublishTopic>,
718 V: Into<Vec<u8>>,
719 {
720 self.handle_try_publish(topic, qos, retain, payload)
721 }
722
723 pub fn try_publish_tracked<T, V>(
734 &self,
735 topic: T,
736 qos: QoS,
737 retain: bool,
738 payload: V,
739 ) -> Result<PublishNotice, ClientError>
740 where
741 T: Into<PublishTopic>,
742 V: Into<Vec<u8>>,
743 {
744 self.handle_try_publish_tracked(topic, qos, retain, payload)
745 }
746
747 pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
751 prepare_ack(publish)
752 }
753
754 pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
763 self.send_request_async(ack.into_request()).await?;
764 Ok(())
765 }
766
767 pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
774 self.try_send_request(ack.into_request())?;
775 Ok(())
776 }
777
778 pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
786 if let Some(ack) = self.prepare_ack(publish) {
787 self.manual_ack(ack).await?;
788 }
789 Ok(())
790 }
791
792 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
800 if let Some(ack) = self.prepare_ack(publish) {
801 self.try_manual_ack(ack)?;
802 }
803 Ok(())
804 }
805
806 async fn handle_publish_bytes<T>(
808 &self,
809 topic: T,
810 qos: QoS,
811 retain: bool,
812 payload: Bytes,
813 ) -> Result<(), ClientError>
814 where
815 T: Into<PublishTopic>,
816 {
817 let (topic, needs_validation) = topic.into().into_string_and_validation();
818 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
819 let mut publish = Publish::from_bytes(topic, qos, payload);
820 publish.retain = retain;
821 let publish = Request::Publish(publish);
822
823 if invalid_topic {
824 return Err(ClientError::Request(publish));
825 }
826
827 self.send_request_async(publish).await?;
828 Ok(())
829 }
830
831 async fn handle_publish_bytes_tracked<T>(
832 &self,
833 topic: T,
834 qos: QoS,
835 retain: bool,
836 payload: Bytes,
837 ) -> Result<PublishNotice, ClientError>
838 where
839 T: Into<PublishTopic>,
840 {
841 let (topic, needs_validation) = topic.into().into_string_and_validation();
842 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
843 let mut publish = Publish::from_bytes(topic, qos, payload);
844 publish.retain = retain;
845 let request = Request::Publish(publish.clone());
846
847 if invalid_topic {
848 return Err(ClientError::Request(request));
849 }
850
851 self.send_tracked_publish_async(publish).await
852 }
853
854 pub async fn publish_bytes<T>(
861 &self,
862 topic: T,
863 qos: QoS,
864 retain: bool,
865 payload: Bytes,
866 ) -> Result<(), ClientError>
867 where
868 T: Into<PublishTopic>,
869 {
870 self.handle_publish_bytes(topic, qos, retain, payload).await
871 }
872
873 pub async fn publish_bytes_tracked<T>(
880 &self,
881 topic: T,
882 qos: QoS,
883 retain: bool,
884 payload: Bytes,
885 ) -> Result<PublishNotice, ClientError>
886 where
887 T: Into<PublishTopic>,
888 {
889 self.handle_publish_bytes_tracked(topic, qos, retain, payload)
890 .await
891 }
892
893 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
900 let subscribe = Subscribe::new(topic, qos);
901 if !subscribe_has_valid_filters(&subscribe) {
902 return Err(ClientError::Request(subscribe.into()));
903 }
904
905 self.send_request_async(subscribe.into()).await?;
906 Ok(())
907 }
908
909 pub async fn subscribe_tracked<S: Into<String>>(
916 &self,
917 topic: S,
918 qos: QoS,
919 ) -> Result<SubscribeNotice, ClientError> {
920 let subscribe = Subscribe::new(topic, qos);
921 if !subscribe_has_valid_filters(&subscribe) {
922 return Err(ClientError::Request(subscribe.into()));
923 }
924
925 self.send_tracked_subscribe_async(subscribe).await
926 }
927
928 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
935 let subscribe = Subscribe::new(topic, qos);
936 if !subscribe_has_valid_filters(&subscribe) {
937 return Err(ClientError::TryRequest(subscribe.into()));
938 }
939
940 self.try_send_request(subscribe.into())?;
941 Ok(())
942 }
943
944 pub fn try_subscribe_tracked<S: Into<String>>(
951 &self,
952 topic: S,
953 qos: QoS,
954 ) -> Result<SubscribeNotice, ClientError> {
955 let subscribe = Subscribe::new(topic, qos);
956 if !subscribe_has_valid_filters(&subscribe) {
957 return Err(ClientError::TryRequest(subscribe.into()));
958 }
959
960 self.try_send_tracked_subscribe(subscribe)
961 }
962
963 pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
970 where
971 T: IntoIterator<Item = SubscribeFilter>,
972 {
973 let subscribe = Subscribe::new_many(topics);
974 if !subscribe_has_valid_filters(&subscribe) {
975 return Err(ClientError::Request(subscribe.into()));
976 }
977
978 self.send_request_async(subscribe.into()).await?;
979 Ok(())
980 }
981
982 pub async fn subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
989 where
990 T: IntoIterator<Item = SubscribeFilter>,
991 {
992 let subscribe = Subscribe::new_many(topics);
993 if !subscribe_has_valid_filters(&subscribe) {
994 return Err(ClientError::Request(subscribe.into()));
995 }
996
997 self.send_tracked_subscribe_async(subscribe).await
998 }
999
1000 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1007 where
1008 T: IntoIterator<Item = SubscribeFilter>,
1009 {
1010 let subscribe = Subscribe::new_many(topics);
1011 if !subscribe_has_valid_filters(&subscribe) {
1012 return Err(ClientError::TryRequest(subscribe.into()));
1013 }
1014 self.try_send_request(subscribe.into())?;
1015 Ok(())
1016 }
1017
1018 pub fn try_subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
1025 where
1026 T: IntoIterator<Item = SubscribeFilter>,
1027 {
1028 let subscribe = Subscribe::new_many(topics);
1029 if !subscribe_has_valid_filters(&subscribe) {
1030 return Err(ClientError::TryRequest(subscribe.into()));
1031 }
1032
1033 self.try_send_tracked_subscribe(subscribe)
1034 }
1035
1036 pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1042 let unsubscribe = Unsubscribe::new(topic.into());
1043 let request = Request::Unsubscribe(unsubscribe);
1044 self.send_request_async(request).await?;
1045 Ok(())
1046 }
1047
1048 pub async fn unsubscribe_tracked<S: Into<String>>(
1054 &self,
1055 topic: S,
1056 ) -> Result<UnsubscribeNotice, ClientError> {
1057 let unsubscribe = Unsubscribe::new(topic.into());
1058 self.send_tracked_unsubscribe_async(unsubscribe).await
1059 }
1060
1061 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1068 let unsubscribe = Unsubscribe::new(topic.into());
1069 let request = Request::Unsubscribe(unsubscribe);
1070 self.try_send_request(request)?;
1071 Ok(())
1072 }
1073
1074 pub fn try_unsubscribe_tracked<S: Into<String>>(
1081 &self,
1082 topic: S,
1083 ) -> Result<UnsubscribeNotice, ClientError> {
1084 let unsubscribe = Unsubscribe::new(topic.into());
1085 self.try_send_tracked_unsubscribe(unsubscribe)
1086 }
1087
1088 pub async fn disconnect(&self) -> Result<(), ClientError> {
1108 let request = Request::Disconnect(Disconnect);
1109 self.send_request_async(request).await?;
1110 Ok(())
1111 }
1112
1113 pub async fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1136 let request = Request::DisconnectWithTimeout(Disconnect, timeout);
1137 self.send_request_async(request).await?;
1138 Ok(())
1139 }
1140
1141 pub async fn disconnect_now(&self) -> Result<(), ClientError> {
1152 let request = Request::DisconnectNow(Disconnect);
1153 self.send_immediate_disconnect_async(request).await?;
1154 Ok(())
1155 }
1156
1157 pub fn try_disconnect(&self) -> Result<(), ClientError> {
1175 let request = Request::Disconnect(Disconnect);
1176 self.try_send_request(request)?;
1177 Ok(())
1178 }
1179
1180 pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1203 let request = Request::DisconnectWithTimeout(Disconnect, timeout);
1204 self.try_send_request(request)?;
1205 Ok(())
1206 }
1207
1208 pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
1219 let request = Request::DisconnectNow(Disconnect);
1220 self.try_send_immediate_disconnect(request)?;
1221 Ok(())
1222 }
1223}
1224
1225const fn prepare_ack(publish: &Publish) -> Option<ManualAck> {
1226 let ack = match publish.qos {
1227 QoS::AtMostOnce => return None,
1228 QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)),
1229 QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)),
1230 };
1231 Some(ack)
1232}
1233
1234#[derive(Clone)]
1245pub struct Client {
1246 client: AsyncClient,
1247}
1248
1249impl Client {
1250 #[must_use]
1256 pub const fn builder(options: MqttOptions) -> ClientBuilder {
1257 ClientBuilder::new(options)
1258 }
1259
1260 #[must_use]
1265 pub const fn from_sender(request_tx: Sender<Request>) -> Self {
1266 Self {
1267 client: AsyncClient::from_senders(request_tx),
1268 }
1269 }
1270
1271 pub fn publish<T, V>(
1278 &self,
1279 topic: T,
1280 qos: QoS,
1281 retain: bool,
1282 payload: V,
1283 ) -> Result<(), ClientError>
1284 where
1285 T: Into<PublishTopic>,
1286 V: Into<Vec<u8>>,
1287 {
1288 let (topic, needs_validation) = topic.into().into_string_and_validation();
1289 let invalid_topic = needs_validation && !valid_publish_topic(&topic);
1290 let mut publish = Publish::new(topic, qos, payload);
1291 publish.retain = retain;
1292 let publish = Request::Publish(publish);
1293 if invalid_topic {
1294 return Err(ClientError::Request(publish));
1295 }
1296 self.client.send_request(publish)?;
1297 Ok(())
1298 }
1299
1300 pub fn try_publish<T, V>(
1307 &self,
1308 topic: T,
1309 qos: QoS,
1310 retain: bool,
1311 payload: V,
1312 ) -> Result<(), ClientError>
1313 where
1314 T: Into<PublishTopic>,
1315 V: Into<Vec<u8>>,
1316 {
1317 self.client.try_publish(topic, qos, retain, payload)?;
1318 Ok(())
1319 }
1320
1321 pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
1325 self.client.prepare_ack(publish)
1326 }
1327
1328 pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1335 self.client.send_request(ack.into_request())?;
1336 Ok(())
1337 }
1338
1339 pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1346 self.client.try_manual_ack(ack)?;
1347 Ok(())
1348 }
1349
1350 pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
1358 if let Some(ack) = self.prepare_ack(publish) {
1359 self.manual_ack(ack)?;
1360 }
1361 Ok(())
1362 }
1363
1364 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
1372 if let Some(ack) = self.prepare_ack(publish) {
1373 self.try_manual_ack(ack)?;
1374 }
1375 Ok(())
1376 }
1377
1378 pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1385 let subscribe = Subscribe::new(topic, qos);
1386 if !subscribe_has_valid_filters(&subscribe) {
1387 return Err(ClientError::Request(subscribe.into()));
1388 }
1389
1390 self.client.send_request(subscribe.into())?;
1391 Ok(())
1392 }
1393
1394 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1401 self.client.try_subscribe(topic, qos)?;
1402 Ok(())
1403 }
1404
1405 pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1412 where
1413 T: IntoIterator<Item = SubscribeFilter>,
1414 {
1415 let subscribe = Subscribe::new_many(topics);
1416 if !subscribe_has_valid_filters(&subscribe) {
1417 return Err(ClientError::Request(subscribe.into()));
1418 }
1419
1420 self.client.send_request(subscribe.into())?;
1421 Ok(())
1422 }
1423
1424 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1431 where
1432 T: IntoIterator<Item = SubscribeFilter>,
1433 {
1434 self.client.try_subscribe_many(topics)
1435 }
1436
1437 pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1443 let unsubscribe = Unsubscribe::new(topic.into());
1444 let request = Request::Unsubscribe(unsubscribe);
1445 self.client.send_request(request)?;
1446 Ok(())
1447 }
1448
1449 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1456 self.client.try_unsubscribe(topic)?;
1457 Ok(())
1458 }
1459
1460 pub fn disconnect(&self) -> Result<(), ClientError> {
1478 let request = Request::Disconnect(Disconnect);
1479 self.client.send_request(request)?;
1480 Ok(())
1481 }
1482
1483 pub fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1506 let request = Request::DisconnectWithTimeout(Disconnect, timeout);
1507 self.client.send_request(request)?;
1508 Ok(())
1509 }
1510
1511 pub fn disconnect_now(&self) -> Result<(), ClientError> {
1522 let request = Request::DisconnectNow(Disconnect);
1523 self.client.send_immediate_disconnect(request)?;
1524 Ok(())
1525 }
1526
1527 pub fn try_disconnect(&self) -> Result<(), ClientError> {
1545 self.client.try_disconnect()?;
1546 Ok(())
1547 }
1548
1549 pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1572 self.client.try_disconnect_with_timeout(timeout)?;
1573 Ok(())
1574 }
1575
1576 pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
1587 self.client.try_disconnect_now()?;
1588 Ok(())
1589 }
1590}
1591
1592#[must_use]
1593fn valid_publish_topic(topic: &str) -> bool {
1594 !topic.is_empty() && valid_topic(topic)
1595}
1596
1597#[must_use]
1598fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
1599 !subscribe.filters.is_empty()
1600 && subscribe
1601 .filters
1602 .iter()
1603 .all(|filter| valid_filter(&filter.path))
1604}
1605
1606#[derive(Debug, Eq, PartialEq)]
1608pub struct RecvError;
1609
1610#[derive(Debug, Eq, PartialEq)]
1612pub enum TryRecvError {
1613 Disconnected,
1615 Empty,
1617}
1618
1619#[derive(Debug, Eq, PartialEq)]
1621pub enum RecvTimeoutError {
1622 Disconnected,
1624 Timeout,
1626}
1627
1628pub struct Connection {
1630 pub eventloop: EventLoop,
1631 runtime: Runtime,
1632}
1633impl Connection {
1634 const fn new(eventloop: EventLoop, runtime: Runtime) -> Self {
1635 Self { eventloop, runtime }
1636 }
1637
1638 #[must_use = "Connection should be iterated over a loop to make progress"]
1645 pub const fn iter(&mut self) -> Iter<'_> {
1646 Iter { connection: self }
1647 }
1648
1649 pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
1658 let f = self.eventloop.poll();
1659 let event = self.runtime.block_on(f);
1660
1661 resolve_event(event).ok_or(RecvError)
1662 }
1663
1664 pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
1674 let f = self.eventloop.poll();
1675 let _guard = self.runtime.enter();
1678 let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
1679
1680 resolve_event(event).ok_or(TryRecvError::Disconnected)
1681 }
1682
1683 pub fn recv_timeout(
1694 &mut self,
1695 duration: Duration,
1696 ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
1697 let f = self.eventloop.poll();
1698 let event = self
1699 .runtime
1700 .block_on(async { timeout(duration, f).await })
1701 .map_err(|_| RecvTimeoutError::Timeout)?;
1702
1703 resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
1704 }
1705}
1706
1707fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
1708 match event {
1709 Ok(v) => Some(Ok(v)),
1710 Err(ConnectionError::RequestsDone) => {
1712 trace!("Done with requests");
1713 None
1714 }
1715 Err(e) => Some(Err(e)),
1716 }
1717}
1718
1719pub struct Iter<'a> {
1721 connection: &'a mut Connection,
1722}
1723
1724impl Iterator for Iter<'_> {
1725 type Item = Result<Event, ConnectionError>;
1726
1727 fn next(&mut self) -> Option<Self::Item> {
1728 self.connection.recv().ok()
1729 }
1730}
1731
1732#[cfg(test)]
1733mod test {
1734 use super::*;
1735 use crate::LastWill;
1736
1737 #[test]
1738 fn calling_iter_twice_on_connection_shouldnt_panic() {
1739 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
1740 let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
1741 mqttoptions.set_keep_alive(5).set_last_will(will);
1742
1743 let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
1744 let _ = connection.iter();
1745 let _ = connection.iter();
1746 }
1747
1748 #[test]
1749 fn builder_uses_options_request_channel_capacity_by_default() {
1750 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
1751 mqttoptions.set_request_channel_capacity(1);
1752 let builder: AsyncClientBuilder = AsyncClient::builder(mqttoptions);
1753 let (client, _eventloop) = builder.build();
1754
1755 client
1756 .try_publish("hello/world", QoS::AtMostOnce, false, "one")
1757 .expect("first request should fit configured capacity");
1758 assert!(matches!(
1759 client.try_publish("hello/world", QoS::AtMostOnce, false, "two"),
1760 Err(ClientError::TryRequest(Request::Publish(_)))
1761 ));
1762 }
1763
1764 #[test]
1765 fn sync_and_async_entry_points_return_distinct_builder_types() {
1766 let sync_builder = Client::builder(MqttOptions::new("test-sync", "localhost"));
1767 let async_builder = AsyncClient::builder(MqttOptions::new("test-async", "localhost"));
1768
1769 let _: ClientBuilder = sync_builder;
1770 let _: AsyncClientBuilder = async_builder;
1771 }
1772
1773 #[test]
1774 fn builder_capacity_overrides_options_request_channel_capacity() {
1775 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
1776 mqttoptions.set_request_channel_capacity(1);
1777 let (client, _eventloop) = Client::builder(mqttoptions).capacity(2).build();
1778
1779 client
1780 .try_publish("hello/world", QoS::AtMostOnce, false, "one")
1781 .expect("first request should fit overridden capacity");
1782 client
1783 .try_publish("hello/world", QoS::AtMostOnce, false, "two")
1784 .expect("second request should fit overridden capacity");
1785 assert!(matches!(
1786 client.try_publish("hello/world", QoS::AtMostOnce, false, "three"),
1787 Err(ClientError::TryRequest(Request::Publish(_)))
1788 ));
1789 }
1790
1791 #[test]
1792 fn builder_capacity_zero_is_bounded_rendezvous() {
1793 let mqttoptions = MqttOptions::new("test-1", "localhost");
1794 let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(0).build();
1795
1796 assert!(matches!(
1797 client.try_publish("hello/world", QoS::AtMostOnce, false, "one"),
1798 Err(ClientError::TryRequest(Request::Publish(_)))
1799 ));
1800 }
1801
1802 #[test]
1803 fn unbounded_builder_allows_try_publish_without_polling() {
1804 let mqttoptions = MqttOptions::new("test-1", "localhost");
1805 let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
1806
1807 for i in 0..128 {
1808 client
1809 .try_publish("hello/world", QoS::AtMostOnce, false, vec![i])
1810 .expect("unbounded channel should accept requests without polling");
1811 }
1812 }
1813
1814 #[tokio::test]
1815 async fn bounded_publish_blocks_when_channel_is_full_without_polling() {
1816 let mqttoptions = MqttOptions::new("test-1", "localhost");
1817 let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(1).build();
1818
1819 client
1820 .publish("hello/world", QoS::AtMostOnce, false, "one")
1821 .await
1822 .expect("first request should fit bounded channel");
1823
1824 let result = tokio::time::timeout(
1825 std::time::Duration::from_millis(25),
1826 client.publish("hello/world", QoS::AtMostOnce, false, "two"),
1827 )
1828 .await;
1829 assert!(result.is_err());
1830 }
1831
1832 #[tokio::test]
1833 async fn unbounded_publish_completes_without_polling() {
1834 let mqttoptions = MqttOptions::new("test-1", "localhost");
1835 let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
1836
1837 for i in 0..128 {
1838 client
1839 .publish("hello/world", QoS::AtMostOnce, false, vec![i])
1840 .await
1841 .expect("unbounded channel should accept requests without polling");
1842 }
1843 }
1844
1845 #[test]
1846 fn should_be_able_to_build_test_client_from_channel() {
1847 let (tx, rx) = flume::bounded(1);
1848 let client = Client::from_sender(tx);
1849 client
1850 .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
1851 .expect("Should be able to publish");
1852 let _ = rx.try_recv().expect("Should have message");
1853 }
1854
1855 #[test]
1856 fn prepare_ack_maps_qos_to_manual_ack_packets_v4() {
1857 let (tx, _) = flume::bounded(1);
1858 let client = Client::from_sender(tx);
1859
1860 let qos0 = Publish::new("hello/world", QoS::AtMostOnce, vec![1]);
1861 assert!(client.prepare_ack(&qos0).is_none());
1862
1863 let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1]);
1864 qos1.pkid = 7;
1865 match client.prepare_ack(&qos1) {
1866 Some(ManualAck::PubAck(ack)) => assert_eq!(ack.pkid, 7),
1867 ack => panic!("expected QoS1 PubAck, got {ack:?}"),
1868 }
1869
1870 let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1]);
1871 qos2.pkid = 9;
1872 match client.prepare_ack(&qos2) {
1873 Some(ManualAck::PubRec(ack)) => assert_eq!(ack.pkid, 9),
1874 ack => panic!("expected QoS2 PubRec, got {ack:?}"),
1875 }
1876 }
1877
1878 #[test]
1879 fn manual_ack_sends_puback_request_v4() {
1880 let (tx, rx) = flume::bounded(1);
1881 let client = Client::from_sender(tx);
1882 client
1883 .manual_ack(ManualAck::PubAck(PubAck::new(42)))
1884 .expect("manual_ack should send request");
1885
1886 let request = rx.try_recv().expect("Should have ack request");
1887 match request {
1888 Request::PubAck(ack) => assert_eq!(ack.pkid, 42),
1889 request => panic!("Expected PubAck request, got {request:?}"),
1890 }
1891 }
1892
1893 #[test]
1894 fn try_manual_ack_sends_pubrec_request_v4() {
1895 let (tx, rx) = flume::bounded(1);
1896 let client = Client::from_sender(tx);
1897 client
1898 .try_manual_ack(ManualAck::PubRec(PubRec::new(51)))
1899 .expect("try_manual_ack should send request");
1900
1901 let request = rx.try_recv().expect("Should have ack request");
1902 match request {
1903 Request::PubRec(ack) => assert_eq!(ack.pkid, 51),
1904 request => panic!("Expected PubRec request, got {request:?}"),
1905 }
1906 }
1907
1908 #[test]
1909 fn ack_and_try_ack_use_manual_ack_flow_v4() {
1910 let (tx, rx) = flume::bounded(2);
1911 let client = Client::from_sender(tx);
1912
1913 let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1]);
1914 qos1.pkid = 11;
1915 client.ack(&qos1).expect("ack should send PubAck");
1916
1917 let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1]);
1918 qos2.pkid = 13;
1919 client
1920 .try_ack(&qos2)
1921 .expect("try_ack should send PubRec request");
1922
1923 let first = rx.try_recv().expect("Should receive first ack request");
1924 match first {
1925 Request::PubAck(ack) => assert_eq!(ack.pkid, 11),
1926 request => panic!("Expected PubAck request, got {request:?}"),
1927 }
1928
1929 let second = rx.try_recv().expect("Should receive second ack request");
1930 match second {
1931 Request::PubRec(ack) => assert_eq!(ack.pkid, 13),
1932 request => panic!("Expected PubRec request, got {request:?}"),
1933 }
1934 }
1935
1936 #[test]
1937 fn can_publish_with_validated_topic() {
1938 let (tx, rx) = flume::bounded(1);
1939 let client = Client::from_sender(tx);
1940 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
1941 client
1942 .publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
1943 .expect("Should be able to publish");
1944 let _ = rx.try_recv().expect("Should have message");
1945 }
1946
1947 #[test]
1948 fn publish_accepts_borrowed_string_topic() {
1949 let (tx, rx) = flume::bounded(2);
1950 let client = Client::from_sender(tx);
1951 let topic = "hello/world".to_string();
1952 client
1953 .publish(&topic, QoS::ExactlyOnce, false, "good bye")
1954 .expect("Should be able to publish");
1955 client
1956 .try_publish(&topic, QoS::ExactlyOnce, false, "good bye")
1957 .expect("Should be able to publish");
1958 let _ = rx.try_recv().expect("Should have message");
1959 let _ = rx.try_recv().expect("Should have message");
1960 }
1961
1962 #[test]
1963 fn publish_accepts_cow_topic_variants() {
1964 let (tx, rx) = flume::bounded(2);
1965 let client = Client::from_sender(tx);
1966 client
1967 .publish(
1968 std::borrow::Cow::Borrowed("hello/world"),
1969 QoS::ExactlyOnce,
1970 false,
1971 "good bye",
1972 )
1973 .expect("Should be able to publish");
1974 client
1975 .try_publish(
1976 std::borrow::Cow::Owned("hello/world".to_owned()),
1977 QoS::ExactlyOnce,
1978 false,
1979 "good bye",
1980 )
1981 .expect("Should be able to publish");
1982 let _ = rx.try_recv().expect("Should have message");
1983 let _ = rx.try_recv().expect("Should have message");
1984 }
1985
1986 #[test]
1987 fn publishing_invalid_cow_topic_fails() {
1988 let (tx, _) = flume::bounded(1);
1989 let client = Client::from_sender(tx);
1990 let err = client
1991 .publish(
1992 std::borrow::Cow::Borrowed("a/+/b"),
1993 QoS::ExactlyOnce,
1994 false,
1995 "good bye",
1996 )
1997 .expect_err("Invalid publish topic should fail");
1998 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
1999 }
2000
2001 #[test]
2002 fn validated_topic_ergonomics() {
2003 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2004 let valid_topic_can_be_cloned = valid_topic.clone();
2005 assert_eq!(valid_topic, valid_topic_can_be_cloned);
2006 }
2007
2008 #[test]
2009 fn creating_invalid_validated_topic_fails() {
2010 assert_eq!(
2011 ValidatedTopic::new("a/+/b"),
2012 Err(InvalidTopic("a/+/b".to_string()))
2013 );
2014 assert_eq!(ValidatedTopic::new(""), Err(InvalidTopic(String::new())));
2015 }
2016
2017 #[test]
2018 fn publishing_invalid_raw_topic_fails() {
2019 let (tx, _) = flume::bounded(1);
2020 let client = Client::from_sender(tx);
2021 let err = client
2022 .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2023 .expect_err("Invalid publish topic should fail");
2024 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2025
2026 let err = client
2027 .publish("", QoS::ExactlyOnce, false, "good bye")
2028 .expect_err("Empty publish topic should fail");
2029 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2030 }
2031
2032 #[test]
2033 fn async_publish_paths_accept_validated_topic() {
2034 let (tx, rx) = flume::bounded(2);
2035 let client = AsyncClient::from_senders(tx);
2036 let runtime = runtime::Builder::new_current_thread()
2037 .enable_all()
2038 .build()
2039 .unwrap();
2040
2041 runtime.block_on(async {
2042 client
2043 .publish(
2044 ValidatedTopic::new("hello/world").unwrap(),
2045 QoS::ExactlyOnce,
2046 false,
2047 "good bye",
2048 )
2049 .await
2050 .expect("Should be able to publish");
2051
2052 client
2053 .publish_bytes(
2054 ValidatedTopic::new("hello/world").unwrap(),
2055 QoS::ExactlyOnce,
2056 false,
2057 Bytes::from_static(b"good bye"),
2058 )
2059 .await
2060 .expect("Should be able to publish");
2061 });
2062
2063 let _ = rx.try_recv().expect("Should have message");
2064 let _ = rx.try_recv().expect("Should have message");
2065 }
2066
2067 #[test]
2068 fn async_publishing_invalid_raw_topic_fails() {
2069 let (tx, _) = flume::bounded(2);
2070 let client = AsyncClient::from_senders(tx);
2071 let runtime = runtime::Builder::new_current_thread()
2072 .enable_all()
2073 .build()
2074 .unwrap();
2075
2076 runtime.block_on(async {
2077 let err = client
2078 .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2079 .await
2080 .expect_err("Invalid publish topic should fail");
2081 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2082
2083 let err = client
2084 .publish_bytes(
2085 "a/+/b",
2086 QoS::ExactlyOnce,
2087 false,
2088 Bytes::from_static(b"good bye"),
2089 )
2090 .await
2091 .expect_err("Invalid publish topic should fail");
2092 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2093
2094 let err = client
2095 .publish("", QoS::ExactlyOnce, false, "good bye")
2096 .await
2097 .expect_err("Empty publish topic should fail");
2098 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2099
2100 let err = client
2101 .publish_bytes("", QoS::ExactlyOnce, false, Bytes::from_static(b"good bye"))
2102 .await
2103 .expect_err("Empty publish topic should fail");
2104 assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2105 });
2106 }
2107
2108 #[test]
2109 fn tracked_publish_requires_tracking_channel() {
2110 let (tx, _) = flume::bounded(2);
2111 let client = AsyncClient::from_senders(tx);
2112 let runtime = runtime::Builder::new_current_thread()
2113 .enable_all()
2114 .build()
2115 .unwrap();
2116
2117 runtime.block_on(async {
2118 let err = client
2119 .publish_tracked("hello/world", QoS::AtLeastOnce, false, "good bye")
2120 .await
2121 .expect_err("tracked publish should fail without tracked channel");
2122 assert!(matches!(err, ClientError::TrackingUnavailable));
2123
2124 let err = client
2125 .publish_bytes_tracked(
2126 "hello/world",
2127 QoS::AtLeastOnce,
2128 false,
2129 Bytes::from_static(b"good bye"),
2130 )
2131 .await
2132 .expect_err("tracked publish bytes should fail without tracked channel");
2133 assert!(matches!(err, ClientError::TrackingUnavailable));
2134
2135 let err = client
2136 .subscribe_tracked("hello/world", QoS::AtLeastOnce)
2137 .await
2138 .expect_err("tracked subscribe should fail without tracked channel");
2139 assert!(matches!(err, ClientError::TrackingUnavailable));
2140
2141 let err = client
2142 .subscribe_many_tracked(vec![SubscribeFilter::new(
2143 "hello/world".to_string(),
2144 QoS::AtLeastOnce,
2145 )])
2146 .await
2147 .expect_err("tracked subscribe many should fail without tracked channel");
2148 assert!(matches!(err, ClientError::TrackingUnavailable));
2149
2150 let err = client
2151 .unsubscribe_tracked("hello/world")
2152 .await
2153 .expect_err("tracked unsubscribe should fail without tracked channel");
2154 assert!(matches!(err, ClientError::TrackingUnavailable));
2155 });
2156
2157 let err = client
2158 .try_subscribe_tracked("hello/world", QoS::AtLeastOnce)
2159 .expect_err("tracked try_subscribe should fail without tracked channel");
2160 assert!(matches!(err, ClientError::TrackingUnavailable));
2161
2162 let err = client
2163 .try_subscribe_many_tracked(vec![SubscribeFilter::new(
2164 "hello/world".to_string(),
2165 QoS::AtLeastOnce,
2166 )])
2167 .expect_err("tracked try_subscribe_many should fail without tracked channel");
2168 assert!(matches!(err, ClientError::TrackingUnavailable));
2169
2170 let err = client
2171 .try_unsubscribe_tracked("hello/world")
2172 .expect_err("tracked try_unsubscribe should fail without tracked channel");
2173 assert!(matches!(err, ClientError::TrackingUnavailable));
2174 }
2175
2176 #[test]
2177 fn tracked_unsubscribe_uses_control_request_channel() {
2178 let (requests, requests_rx) = flume::bounded(1);
2179 let (control_requests, control_requests_rx) = flume::bounded(1);
2180 let (immediate_disconnect, _immediate_disconnect_rx) = flume::unbounded();
2181 let client = AsyncClient {
2182 request_tx: RequestSender::WithNotice {
2183 requests,
2184 control_requests,
2185 immediate_disconnect,
2186 },
2187 };
2188 let runtime = runtime::Builder::new_current_thread()
2189 .enable_all()
2190 .build()
2191 .unwrap();
2192
2193 runtime
2194 .block_on(client.unsubscribe_tracked("hello/world"))
2195 .expect("tracked unsubscribe should enqueue");
2196
2197 assert!(requests_rx.is_empty());
2198 let envelope = control_requests_rx
2199 .try_recv()
2200 .expect("tracked unsubscribe should use control channel");
2201 assert!(matches!(envelope.into_parts().0, Request::Unsubscribe(_)));
2202 }
2203}