1use std::borrow::Cow;
4use std::time::Duration;
5
6use super::eventloop::{RequestChannelCapacity, RequestEnvelope};
7use super::mqttbytes::QoS;
8use super::mqttbytes::v5::{
9 Auth, AuthProperties, AuthReasonCode, Filter, PubAck, PubRec, Publish, PublishProperties,
10 Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties,
11};
12use super::{
13 ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop,
14 MqttOptions, Request,
15};
16use crate::notice::{AuthNoticeTx, PublishNoticeTx, SubscribeNoticeTx, UnsubscribeNoticeTx};
17use crate::{
18 AuthNotice, PublishNotice, SubscribeNotice, UnsubscribeNotice, valid_filter, valid_topic,
19};
20
21use bytes::Bytes;
22use flume::{SendError, Sender, TrySendError};
23use futures_util::FutureExt;
24use tokio::runtime::{self, Runtime};
25use tokio::time::timeout;
26
27#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
29#[error("Invalid MQTT topic: '{0}'")]
30pub struct InvalidTopic(String);
31
32#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct ValidatedTopic(String);
42
43impl ValidatedTopic {
44 pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
50 let topic_string = topic.into();
51 if valid_topic(&topic_string) {
52 Ok(Self(topic_string))
53 } else {
54 Err(InvalidTopic(topic_string))
55 }
56 }
57}
58
59impl From<ValidatedTopic> for String {
60 fn from(topic: ValidatedTopic) -> Self {
61 topic.0
62 }
63}
64
65pub enum PublishTopic {
70 Unvalidated(String),
72 Validated(ValidatedTopic),
74}
75
76impl PublishTopic {
77 fn into_string_and_validation(self) -> (String, bool) {
78 match self {
79 Self::Unvalidated(topic) => (topic, true),
80 Self::Validated(topic) => (topic.0, false),
81 }
82 }
83}
84
85impl From<ValidatedTopic> for PublishTopic {
86 fn from(topic: ValidatedTopic) -> Self {
87 Self::Validated(topic)
88 }
89}
90
91impl From<String> for PublishTopic {
92 fn from(topic: String) -> Self {
93 Self::Unvalidated(topic)
94 }
95}
96
97impl From<&str> for PublishTopic {
98 fn from(topic: &str) -> Self {
99 Self::Unvalidated(topic.to_owned())
100 }
101}
102
103impl From<&String> for PublishTopic {
104 fn from(topic: &String) -> Self {
105 Self::Unvalidated(topic.clone())
106 }
107}
108
109impl From<Cow<'_, str>> for PublishTopic {
110 fn from(topic: Cow<'_, str>) -> Self {
111 Self::Unvalidated(topic.into_owned())
112 }
113}
114
115#[derive(Debug, thiserror::Error)]
117pub enum ClientError {
118 #[error("Failed to send mqtt requests to eventloop")]
119 Request(Box<Request>),
120 #[error("Failed to send mqtt requests to eventloop")]
121 TryRequest(Box<Request>),
122 #[error("Tracked request API is unavailable for this client instance")]
123 TrackingUnavailable,
124}
125
126impl From<SendError<Request>> for ClientError {
127 fn from(e: SendError<Request>) -> Self {
128 Self::Request(Box::new(e.into_inner()))
129 }
130}
131
132impl From<TrySendError<Request>> for ClientError {
133 fn from(e: TrySendError<Request>) -> Self {
134 Self::TryRequest(Box::new(e.into_inner()))
135 }
136}
137
138#[derive(Clone, Debug)]
139enum RequestSender {
140 Plain(Sender<Request>),
141 WithNotice {
142 requests: Sender<RequestEnvelope>,
143 control_requests: Sender<RequestEnvelope>,
144 immediate_disconnect: Sender<RequestEnvelope>,
145 },
146}
147
148fn into_request(envelope: RequestEnvelope) -> Request {
149 let (request, _notice) = envelope.into_parts();
150 request
151}
152
153fn map_send_envelope_error(err: SendError<RequestEnvelope>) -> ClientError {
154 ClientError::Request(Box::new(into_request(err.into_inner())))
155}
156
157fn map_try_send_envelope_error(err: TrySendError<RequestEnvelope>) -> ClientError {
158 match err {
159 TrySendError::Full(envelope) | TrySendError::Disconnected(envelope) => {
160 ClientError::TryRequest(Box::new(into_request(envelope)))
161 }
162 }
163}
164
165const fn is_publish_request(request: &Request) -> bool {
166 matches!(request, Request::Publish(_))
167}
168
169#[derive(Clone, Debug, PartialEq, Eq)]
171pub enum ManualAck {
172 PubAck(PubAck),
173 PubRec(PubRec),
174}
175
176impl ManualAck {
177 fn into_request(self) -> Request {
178 match self {
179 Self::PubAck(ack) => Request::PubAck(ack),
180 Self::PubRec(rec) => Request::PubRec(rec),
181 }
182 }
183}
184
185#[derive(Clone, Debug)]
208pub struct AsyncClient {
209 request_tx: RequestSender,
210}
211
212#[derive(Debug)]
218pub struct ClientBuilder {
219 options: MqttOptions,
220 capacity: RequestChannelCapacity,
221}
222
223#[derive(Debug)]
229pub struct AsyncClientBuilder {
230 options: MqttOptions,
231 capacity: RequestChannelCapacity,
232}
233
234#[must_use]
235fn build_async_client(
236 options: MqttOptions,
237 capacity: RequestChannelCapacity,
238) -> (AsyncClient, EventLoop) {
239 let (eventloop, request_tx, control_request_tx, immediate_disconnect_tx) =
240 EventLoop::new_for_async_client_with_capacity(options, capacity);
241 let client = AsyncClient {
242 request_tx: RequestSender::WithNotice {
243 requests: request_tx,
244 control_requests: control_request_tx,
245 immediate_disconnect: immediate_disconnect_tx,
246 },
247 };
248
249 (client, eventloop)
250}
251
252impl ClientBuilder {
253 #[must_use]
255 pub const fn new(options: MqttOptions) -> Self {
256 let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
257 Self { options, capacity }
258 }
259
260 #[must_use]
265 pub const fn capacity(mut self, cap: usize) -> Self {
266 self.capacity = RequestChannelCapacity::Bounded(cap);
267 self
268 }
269
270 #[must_use]
272 pub const fn unbounded(mut self) -> Self {
273 self.capacity = RequestChannelCapacity::Unbounded;
274 self
275 }
276
277 #[must_use]
286 pub fn build(self) -> (Client, Connection) {
287 let (client, eventloop) = build_async_client(self.options, self.capacity);
288 let client = Client { client };
289
290 let runtime = runtime::Builder::new_current_thread()
291 .enable_all()
292 .build()
293 .unwrap();
294
295 let connection = Connection::new(eventloop, runtime);
296 (client, connection)
297 }
298}
299
300impl AsyncClientBuilder {
301 #[must_use]
303 pub const fn new(options: MqttOptions) -> Self {
304 let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
305 Self { options, capacity }
306 }
307
308 #[must_use]
313 pub const fn capacity(mut self, cap: usize) -> Self {
314 self.capacity = RequestChannelCapacity::Bounded(cap);
315 self
316 }
317
318 #[must_use]
320 pub const fn unbounded(mut self) -> Self {
321 self.capacity = RequestChannelCapacity::Unbounded;
322 self
323 }
324
325 #[must_use]
330 pub fn build(self) -> (AsyncClient, EventLoop) {
331 build_async_client(self.options, self.capacity)
332 }
333}
334
335impl AsyncClient {
336 #[must_use]
342 pub const fn builder(options: MqttOptions) -> AsyncClientBuilder {
343 AsyncClientBuilder::new(options)
344 }
345
346 #[must_use]
351 pub const fn from_senders(request_tx: Sender<Request>) -> Self {
352 Self {
353 request_tx: RequestSender::Plain(request_tx),
354 }
355 }
356
357 async fn send_request_async(&self, request: Request) -> Result<(), ClientError> {
358 match &self.request_tx {
359 RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
360 RequestSender::WithNotice {
361 requests,
362 control_requests,
363 ..
364 } => {
365 let tx = if is_publish_request(&request) {
366 requests
367 } else {
368 control_requests
369 };
370 tx.send_async(RequestEnvelope::plain(request))
371 .await
372 .map_err(map_send_envelope_error)
373 }
374 }
375 }
376
377 fn try_send_request(&self, request: Request) -> Result<(), ClientError> {
378 match &self.request_tx {
379 RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
380 RequestSender::WithNotice {
381 requests,
382 control_requests,
383 ..
384 } => {
385 let tx = if is_publish_request(&request) {
386 requests
387 } else {
388 control_requests
389 };
390 tx.try_send(RequestEnvelope::plain(request))
391 .map_err(map_try_send_envelope_error)
392 }
393 }
394 }
395
396 fn send_request(&self, request: Request) -> Result<(), ClientError> {
397 match &self.request_tx {
398 RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
399 RequestSender::WithNotice {
400 requests,
401 control_requests,
402 ..
403 } => {
404 let tx = if is_publish_request(&request) {
405 requests
406 } else {
407 control_requests
408 };
409 tx.send(RequestEnvelope::plain(request))
410 .map_err(map_send_envelope_error)
411 }
412 }
413 }
414
415 async fn send_immediate_disconnect_async(&self, request: Request) -> Result<(), ClientError> {
416 match &self.request_tx {
417 RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
418 RequestSender::WithNotice {
419 immediate_disconnect,
420 ..
421 } => immediate_disconnect
422 .send_async(RequestEnvelope::plain(request))
423 .await
424 .map_err(map_send_envelope_error),
425 }
426 }
427
428 fn send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
429 match &self.request_tx {
430 RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
431 RequestSender::WithNotice {
432 immediate_disconnect,
433 ..
434 } => immediate_disconnect
435 .send(RequestEnvelope::plain(request))
436 .map_err(map_send_envelope_error),
437 }
438 }
439
440 fn try_send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
441 match &self.request_tx {
442 RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
443 RequestSender::WithNotice {
444 immediate_disconnect,
445 ..
446 } => immediate_disconnect
447 .try_send(RequestEnvelope::plain(request))
448 .map_err(map_try_send_envelope_error),
449 }
450 }
451
452 async fn send_tracked_publish_async(
453 &self,
454 publish: Publish,
455 ) -> Result<PublishNotice, ClientError> {
456 let RequestSender::WithNotice {
457 requests: request_tx,
458 ..
459 } = &self.request_tx
460 else {
461 return Err(ClientError::TrackingUnavailable);
462 };
463
464 let (notice_tx, notice) = PublishNoticeTx::new();
465 request_tx
466 .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
467 .await
468 .map_err(map_send_envelope_error)?;
469 Ok(notice)
470 }
471
472 fn try_send_tracked_publish(&self, publish: Publish) -> Result<PublishNotice, ClientError> {
473 let RequestSender::WithNotice {
474 requests: request_tx,
475 ..
476 } = &self.request_tx
477 else {
478 return Err(ClientError::TrackingUnavailable);
479 };
480
481 let (notice_tx, notice) = PublishNoticeTx::new();
482 request_tx
483 .try_send(RequestEnvelope::tracked_publish(publish, notice_tx))
484 .map_err(map_try_send_envelope_error)?;
485 Ok(notice)
486 }
487
488 async fn send_tracked_subscribe_async(
489 &self,
490 subscribe: Subscribe,
491 ) -> Result<SubscribeNotice, ClientError> {
492 let RequestSender::WithNotice {
493 control_requests: request_tx,
494 ..
495 } = &self.request_tx
496 else {
497 return Err(ClientError::TrackingUnavailable);
498 };
499
500 let (notice_tx, notice) = SubscribeNoticeTx::new();
501 request_tx
502 .send_async(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
503 .await
504 .map_err(map_send_envelope_error)?;
505 Ok(notice)
506 }
507
508 fn try_send_tracked_subscribe(
509 &self,
510 subscribe: Subscribe,
511 ) -> Result<SubscribeNotice, ClientError> {
512 let RequestSender::WithNotice {
513 control_requests: request_tx,
514 ..
515 } = &self.request_tx
516 else {
517 return Err(ClientError::TrackingUnavailable);
518 };
519
520 let (notice_tx, notice) = SubscribeNoticeTx::new();
521 request_tx
522 .try_send(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
523 .map_err(map_try_send_envelope_error)?;
524 Ok(notice)
525 }
526
527 async fn send_tracked_unsubscribe_async(
528 &self,
529 unsubscribe: Unsubscribe,
530 ) -> Result<UnsubscribeNotice, ClientError> {
531 let RequestSender::WithNotice {
532 control_requests: request_tx,
533 ..
534 } = &self.request_tx
535 else {
536 return Err(ClientError::TrackingUnavailable);
537 };
538
539 let (notice_tx, notice) = UnsubscribeNoticeTx::new();
540 request_tx
541 .send_async(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
542 .await
543 .map_err(map_send_envelope_error)?;
544 Ok(notice)
545 }
546
547 fn try_send_tracked_unsubscribe(
548 &self,
549 unsubscribe: Unsubscribe,
550 ) -> Result<UnsubscribeNotice, ClientError> {
551 let RequestSender::WithNotice {
552 control_requests: request_tx,
553 ..
554 } = &self.request_tx
555 else {
556 return Err(ClientError::TrackingUnavailable);
557 };
558
559 let (notice_tx, notice) = UnsubscribeNoticeTx::new();
560 request_tx
561 .try_send(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
562 .map_err(map_try_send_envelope_error)?;
563 Ok(notice)
564 }
565
566 async fn send_tracked_auth_async(&self, auth: Auth) -> Result<AuthNotice, ClientError> {
567 let RequestSender::WithNotice {
568 control_requests: request_tx,
569 ..
570 } = &self.request_tx
571 else {
572 return Err(ClientError::TrackingUnavailable);
573 };
574
575 let (notice_tx, notice) = AuthNoticeTx::new();
576 request_tx
577 .send_async(RequestEnvelope::tracked_auth(auth, notice_tx))
578 .await
579 .map_err(map_send_envelope_error)?;
580 Ok(notice)
581 }
582
583 fn send_tracked_auth(&self, auth: Auth) -> Result<AuthNotice, ClientError> {
584 let RequestSender::WithNotice {
585 control_requests: request_tx,
586 ..
587 } = &self.request_tx
588 else {
589 return Err(ClientError::TrackingUnavailable);
590 };
591
592 let (notice_tx, notice) = AuthNoticeTx::new();
593 request_tx
594 .send(RequestEnvelope::tracked_auth(auth, notice_tx))
595 .map_err(map_send_envelope_error)?;
596 Ok(notice)
597 }
598
599 fn try_send_tracked_auth(&self, auth: Auth) -> Result<AuthNotice, ClientError> {
600 let RequestSender::WithNotice {
601 control_requests: request_tx,
602 ..
603 } = &self.request_tx
604 else {
605 return Err(ClientError::TrackingUnavailable);
606 };
607
608 let (notice_tx, notice) = AuthNoticeTx::new();
609 request_tx
610 .try_send(RequestEnvelope::tracked_auth(auth, notice_tx))
611 .map_err(map_try_send_envelope_error)?;
612 Ok(notice)
613 }
614
615 async fn handle_publish<T, P>(
617 &self,
618 topic: T,
619 qos: QoS,
620 retain: bool,
621 payload: P,
622 properties: Option<PublishProperties>,
623 ) -> Result<(), ClientError>
624 where
625 T: Into<PublishTopic>,
626 P: Into<Bytes>,
627 {
628 let (topic, needs_validation) = topic.into().into_string_and_validation();
629 let invalid_topic = (needs_validation && !valid_topic(&topic))
630 || empty_topic_without_valid_alias(&topic, properties.as_ref());
631 let mut publish = Publish::new(topic, qos, payload, properties);
632 publish.retain = retain;
633 let publish = Request::Publish(publish);
634
635 if invalid_topic {
636 return Err(ClientError::Request(Box::new(publish)));
637 }
638
639 self.send_request_async(publish).await?;
640 Ok(())
641 }
642
643 async fn handle_publish_tracked<T, P>(
644 &self,
645 topic: T,
646 qos: QoS,
647 retain: bool,
648 payload: P,
649 properties: Option<PublishProperties>,
650 ) -> Result<PublishNotice, ClientError>
651 where
652 T: Into<PublishTopic>,
653 P: Into<Bytes>,
654 {
655 let (topic, needs_validation) = topic.into().into_string_and_validation();
656 let invalid_topic = (needs_validation && !valid_topic(&topic))
657 || empty_topic_without_valid_alias(&topic, properties.as_ref());
658 let mut publish = Publish::new(topic, qos, payload, properties);
659 publish.retain = retain;
660 let request = Request::Publish(publish.clone());
661
662 if invalid_topic {
663 return Err(ClientError::Request(Box::new(request)));
664 }
665
666 self.send_tracked_publish_async(publish).await
667 }
668
669 pub async fn publish_with_properties<T, P>(
676 &self,
677 topic: T,
678 qos: QoS,
679 retain: bool,
680 payload: P,
681 properties: PublishProperties,
682 ) -> Result<(), ClientError>
683 where
684 T: Into<PublishTopic>,
685 P: Into<Bytes>,
686 {
687 self.handle_publish(topic, qos, retain, payload, Some(properties))
688 .await
689 }
690
691 pub async fn publish_with_properties_tracked<T, P>(
698 &self,
699 topic: T,
700 qos: QoS,
701 retain: bool,
702 payload: P,
703 properties: PublishProperties,
704 ) -> Result<PublishNotice, ClientError>
705 where
706 T: Into<PublishTopic>,
707 P: Into<Bytes>,
708 {
709 self.handle_publish_tracked(topic, qos, retain, payload, Some(properties))
710 .await
711 }
712
713 pub async fn publish<T, P>(
720 &self,
721 topic: T,
722 qos: QoS,
723 retain: bool,
724 payload: P,
725 ) -> Result<(), ClientError>
726 where
727 T: Into<PublishTopic>,
728 P: Into<Bytes>,
729 {
730 self.handle_publish(topic, qos, retain, payload, None).await
731 }
732
733 pub async fn publish_tracked<T, P>(
740 &self,
741 topic: T,
742 qos: QoS,
743 retain: bool,
744 payload: P,
745 ) -> Result<PublishNotice, ClientError>
746 where
747 T: Into<PublishTopic>,
748 P: Into<Bytes>,
749 {
750 self.handle_publish_tracked(topic, qos, retain, payload, None)
751 .await
752 }
753
754 fn handle_try_publish<T, P>(
756 &self,
757 topic: T,
758 qos: QoS,
759 retain: bool,
760 payload: P,
761 properties: Option<PublishProperties>,
762 ) -> Result<(), ClientError>
763 where
764 T: Into<PublishTopic>,
765 P: Into<Bytes>,
766 {
767 let (topic, needs_validation) = topic.into().into_string_and_validation();
768 let invalid_topic = (needs_validation && !valid_topic(&topic))
769 || empty_topic_without_valid_alias(&topic, properties.as_ref());
770 let mut publish = Publish::new(topic, qos, payload, properties);
771 publish.retain = retain;
772 let publish = Request::Publish(publish);
773
774 if invalid_topic {
775 return Err(ClientError::TryRequest(Box::new(publish)));
776 }
777
778 self.try_send_request(publish)?;
779 Ok(())
780 }
781
782 fn handle_try_publish_tracked<T, P>(
783 &self,
784 topic: T,
785 qos: QoS,
786 retain: bool,
787 payload: P,
788 properties: Option<PublishProperties>,
789 ) -> Result<PublishNotice, ClientError>
790 where
791 T: Into<PublishTopic>,
792 P: Into<Bytes>,
793 {
794 let (topic, needs_validation) = topic.into().into_string_and_validation();
795 let invalid_topic = (needs_validation && !valid_topic(&topic))
796 || empty_topic_without_valid_alias(&topic, properties.as_ref());
797 let mut publish = Publish::new(topic, qos, payload, properties);
798 publish.retain = retain;
799 let request = Request::Publish(publish.clone());
800
801 if invalid_topic {
802 return Err(ClientError::TryRequest(Box::new(request)));
803 }
804
805 self.try_send_tracked_publish(publish)
806 }
807
808 pub fn try_publish_with_properties<T, P>(
819 &self,
820 topic: T,
821 qos: QoS,
822 retain: bool,
823 payload: P,
824 properties: PublishProperties,
825 ) -> Result<(), ClientError>
826 where
827 T: Into<PublishTopic>,
828 P: Into<Bytes>,
829 {
830 self.handle_try_publish(topic, qos, retain, payload, Some(properties))
831 }
832
833 pub fn try_publish_with_properties_tracked<T, P>(
844 &self,
845 topic: T,
846 qos: QoS,
847 retain: bool,
848 payload: P,
849 properties: PublishProperties,
850 ) -> Result<PublishNotice, ClientError>
851 where
852 T: Into<PublishTopic>,
853 P: Into<Bytes>,
854 {
855 self.handle_try_publish_tracked(topic, qos, retain, payload, Some(properties))
856 }
857
858 pub fn try_publish<T, P>(
869 &self,
870 topic: T,
871 qos: QoS,
872 retain: bool,
873 payload: P,
874 ) -> Result<(), ClientError>
875 where
876 T: Into<PublishTopic>,
877 P: Into<Bytes>,
878 {
879 self.handle_try_publish(topic, qos, retain, payload, None)
880 }
881
882 pub fn try_publish_tracked<T, P>(
893 &self,
894 topic: T,
895 qos: QoS,
896 retain: bool,
897 payload: P,
898 ) -> Result<PublishNotice, ClientError>
899 where
900 T: Into<PublishTopic>,
901 P: Into<Bytes>,
902 {
903 self.handle_try_publish_tracked(topic, qos, retain, payload, None)
904 }
905
906 pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
938 prepare_ack(publish)
939 }
940
941 pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
950 self.send_request_async(ack.into_request()).await?;
951 Ok(())
952 }
953
954 pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
961 self.try_send_request(ack.into_request())?;
962 Ok(())
963 }
964
965 pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
973 if let Some(ack) = self.prepare_ack(publish) {
974 self.manual_ack(ack).await?;
975 }
976 Ok(())
977 }
978
979 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
987 if let Some(ack) = self.prepare_ack(publish) {
988 self.try_manual_ack(ack)?;
989 }
990 Ok(())
991 }
992
993 pub async fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
999 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1000 let auth = Request::Auth(auth);
1001 self.send_request_async(auth).await?;
1002 Ok(())
1003 }
1004
1005 pub async fn reauth_tracked(
1012 &self,
1013 properties: Option<AuthProperties>,
1014 ) -> Result<AuthNotice, ClientError> {
1015 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1016 self.send_tracked_auth_async(auth).await
1017 }
1018
1019 pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
1026 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1027 let auth = Request::Auth(auth);
1028 self.try_send_request(auth)?;
1029 Ok(())
1030 }
1031
1032 pub fn try_reauth_tracked(
1040 &self,
1041 properties: Option<AuthProperties>,
1042 ) -> Result<AuthNotice, ClientError> {
1043 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1044 self.try_send_tracked_auth(auth)
1045 }
1046
1047 async fn handle_publish_bytes<T>(
1049 &self,
1050 topic: T,
1051 qos: QoS,
1052 retain: bool,
1053 payload: Bytes,
1054 properties: Option<PublishProperties>,
1055 ) -> Result<(), ClientError>
1056 where
1057 T: Into<PublishTopic>,
1058 {
1059 let (topic, needs_validation) = topic.into().into_string_and_validation();
1060 let invalid_topic = (needs_validation && !valid_topic(&topic))
1061 || empty_topic_without_valid_alias(&topic, properties.as_ref());
1062 let mut publish = Publish::new(topic, qos, payload, properties);
1063 publish.retain = retain;
1064 let publish = Request::Publish(publish);
1065
1066 if invalid_topic {
1067 return Err(ClientError::Request(Box::new(publish)));
1068 }
1069
1070 self.send_request_async(publish).await?;
1071 Ok(())
1072 }
1073
1074 async fn handle_publish_bytes_tracked<T>(
1075 &self,
1076 topic: T,
1077 qos: QoS,
1078 retain: bool,
1079 payload: Bytes,
1080 properties: Option<PublishProperties>,
1081 ) -> Result<PublishNotice, ClientError>
1082 where
1083 T: Into<PublishTopic>,
1084 {
1085 let (topic, needs_validation) = topic.into().into_string_and_validation();
1086 let invalid_topic = (needs_validation && !valid_topic(&topic))
1087 || empty_topic_without_valid_alias(&topic, properties.as_ref());
1088 let mut publish = Publish::new(topic, qos, payload, properties);
1089 publish.retain = retain;
1090 let request = Request::Publish(publish.clone());
1091
1092 if invalid_topic {
1093 return Err(ClientError::Request(Box::new(request)));
1094 }
1095
1096 self.send_tracked_publish_async(publish).await
1097 }
1098
1099 pub async fn publish_bytes_with_properties<T>(
1106 &self,
1107 topic: T,
1108 qos: QoS,
1109 retain: bool,
1110 payload: Bytes,
1111 properties: PublishProperties,
1112 ) -> Result<(), ClientError>
1113 where
1114 T: Into<PublishTopic>,
1115 {
1116 self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
1117 .await
1118 }
1119
1120 pub async fn publish_bytes_with_properties_tracked<T>(
1127 &self,
1128 topic: T,
1129 qos: QoS,
1130 retain: bool,
1131 payload: Bytes,
1132 properties: PublishProperties,
1133 ) -> Result<PublishNotice, ClientError>
1134 where
1135 T: Into<PublishTopic>,
1136 {
1137 self.handle_publish_bytes_tracked(topic, qos, retain, payload, Some(properties))
1138 .await
1139 }
1140
1141 pub async fn publish_bytes<T>(
1148 &self,
1149 topic: T,
1150 qos: QoS,
1151 retain: bool,
1152 payload: Bytes,
1153 ) -> Result<(), ClientError>
1154 where
1155 T: Into<PublishTopic>,
1156 {
1157 self.handle_publish_bytes(topic, qos, retain, payload, None)
1158 .await
1159 }
1160
1161 pub async fn publish_bytes_tracked<T>(
1168 &self,
1169 topic: T,
1170 qos: QoS,
1171 retain: bool,
1172 payload: Bytes,
1173 ) -> Result<PublishNotice, ClientError>
1174 where
1175 T: Into<PublishTopic>,
1176 {
1177 self.handle_publish_bytes_tracked(topic, qos, retain, payload, None)
1178 .await
1179 }
1180
1181 async fn handle_subscribe<S: Into<String>>(
1183 &self,
1184 topic: S,
1185 qos: QoS,
1186 properties: Option<SubscribeProperties>,
1187 ) -> Result<(), ClientError> {
1188 let filter = Filter::new(topic, qos);
1189 let subscribe = Subscribe::new(filter, properties);
1190 if !subscribe_has_valid_filters(&subscribe) {
1191 return Err(ClientError::Request(Box::new(subscribe.into())));
1192 }
1193
1194 self.send_request_async(subscribe.into()).await?;
1195 Ok(())
1196 }
1197
1198 async fn handle_subscribe_tracked<S: Into<String>>(
1199 &self,
1200 topic: S,
1201 qos: QoS,
1202 properties: Option<SubscribeProperties>,
1203 ) -> Result<SubscribeNotice, ClientError> {
1204 let filter = Filter::new(topic, qos);
1205 let subscribe = Subscribe::new(filter, properties);
1206 if !subscribe_has_valid_filters(&subscribe) {
1207 return Err(ClientError::Request(Box::new(subscribe.into())));
1208 }
1209
1210 self.send_tracked_subscribe_async(subscribe).await
1211 }
1212
1213 pub async fn subscribe_with_properties<S: Into<String>>(
1220 &self,
1221 topic: S,
1222 qos: QoS,
1223 properties: SubscribeProperties,
1224 ) -> Result<(), ClientError> {
1225 self.handle_subscribe(topic, qos, Some(properties)).await
1226 }
1227
1228 pub async fn subscribe_with_properties_tracked<S: Into<String>>(
1235 &self,
1236 topic: S,
1237 qos: QoS,
1238 properties: SubscribeProperties,
1239 ) -> Result<SubscribeNotice, ClientError> {
1240 self.handle_subscribe_tracked(topic, qos, Some(properties))
1241 .await
1242 }
1243
1244 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1251 self.handle_subscribe(topic, qos, None).await
1252 }
1253
1254 pub async fn subscribe_tracked<S: Into<String>>(
1261 &self,
1262 topic: S,
1263 qos: QoS,
1264 ) -> Result<SubscribeNotice, ClientError> {
1265 self.handle_subscribe_tracked(topic, qos, None).await
1266 }
1267
1268 fn handle_try_subscribe<S: Into<String>>(
1270 &self,
1271 topic: S,
1272 qos: QoS,
1273 properties: Option<SubscribeProperties>,
1274 ) -> Result<(), ClientError> {
1275 let filter = Filter::new(topic, qos);
1276 let subscribe = Subscribe::new(filter, properties);
1277 if !subscribe_has_valid_filters(&subscribe) {
1278 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1279 }
1280
1281 self.try_send_request(subscribe.into())?;
1282 Ok(())
1283 }
1284
1285 fn handle_try_subscribe_tracked<S: Into<String>>(
1286 &self,
1287 topic: S,
1288 qos: QoS,
1289 properties: Option<SubscribeProperties>,
1290 ) -> Result<SubscribeNotice, ClientError> {
1291 let filter = Filter::new(topic, qos);
1292 let subscribe = Subscribe::new(filter, properties);
1293 if !subscribe_has_valid_filters(&subscribe) {
1294 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1295 }
1296
1297 self.try_send_tracked_subscribe(subscribe)
1298 }
1299
1300 pub fn try_subscribe_with_properties<S: Into<String>>(
1307 &self,
1308 topic: S,
1309 qos: QoS,
1310 properties: SubscribeProperties,
1311 ) -> Result<(), ClientError> {
1312 self.handle_try_subscribe(topic, qos, Some(properties))
1313 }
1314
1315 pub fn try_subscribe_with_properties_tracked<S: Into<String>>(
1322 &self,
1323 topic: S,
1324 qos: QoS,
1325 properties: SubscribeProperties,
1326 ) -> Result<SubscribeNotice, ClientError> {
1327 self.handle_try_subscribe_tracked(topic, qos, Some(properties))
1328 }
1329
1330 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1337 self.handle_try_subscribe(topic, qos, None)
1338 }
1339
1340 pub fn try_subscribe_tracked<S: Into<String>>(
1347 &self,
1348 topic: S,
1349 qos: QoS,
1350 ) -> Result<SubscribeNotice, ClientError> {
1351 self.handle_try_subscribe_tracked(topic, qos, None)
1352 }
1353
1354 async fn handle_subscribe_many<T>(
1356 &self,
1357 topics: T,
1358 properties: Option<SubscribeProperties>,
1359 ) -> Result<(), ClientError>
1360 where
1361 T: IntoIterator<Item = Filter>,
1362 {
1363 let subscribe = Subscribe::new_many(topics, properties);
1364 if !subscribe_has_valid_filters(&subscribe) {
1365 return Err(ClientError::Request(Box::new(subscribe.into())));
1366 }
1367
1368 self.send_request_async(subscribe.into()).await?;
1369
1370 Ok(())
1371 }
1372
1373 async fn handle_subscribe_many_tracked<T>(
1374 &self,
1375 topics: T,
1376 properties: Option<SubscribeProperties>,
1377 ) -> Result<SubscribeNotice, ClientError>
1378 where
1379 T: IntoIterator<Item = Filter>,
1380 {
1381 let subscribe = Subscribe::new_many(topics, properties);
1382 if !subscribe_has_valid_filters(&subscribe) {
1383 return Err(ClientError::Request(Box::new(subscribe.into())));
1384 }
1385
1386 self.send_tracked_subscribe_async(subscribe).await
1387 }
1388
1389 pub async fn subscribe_many_with_properties<T>(
1396 &self,
1397 topics: T,
1398 properties: SubscribeProperties,
1399 ) -> Result<(), ClientError>
1400 where
1401 T: IntoIterator<Item = Filter>,
1402 {
1403 self.handle_subscribe_many(topics, Some(properties)).await
1404 }
1405
1406 pub async fn subscribe_many_with_properties_tracked<T>(
1413 &self,
1414 topics: T,
1415 properties: SubscribeProperties,
1416 ) -> Result<SubscribeNotice, ClientError>
1417 where
1418 T: IntoIterator<Item = Filter>,
1419 {
1420 self.handle_subscribe_many_tracked(topics, Some(properties))
1421 .await
1422 }
1423
1424 pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1431 where
1432 T: IntoIterator<Item = Filter>,
1433 {
1434 self.handle_subscribe_many(topics, None).await
1435 }
1436
1437 pub async fn subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
1444 where
1445 T: IntoIterator<Item = Filter>,
1446 {
1447 self.handle_subscribe_many_tracked(topics, None).await
1448 }
1449
1450 fn handle_try_subscribe_many<T>(
1452 &self,
1453 topics: T,
1454 properties: Option<SubscribeProperties>,
1455 ) -> Result<(), ClientError>
1456 where
1457 T: IntoIterator<Item = Filter>,
1458 {
1459 let subscribe = Subscribe::new_many(topics, properties);
1460 if !subscribe_has_valid_filters(&subscribe) {
1461 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1462 }
1463
1464 self.try_send_request(subscribe.into())?;
1465 Ok(())
1466 }
1467
1468 fn handle_try_subscribe_many_tracked<T>(
1469 &self,
1470 topics: T,
1471 properties: Option<SubscribeProperties>,
1472 ) -> Result<SubscribeNotice, ClientError>
1473 where
1474 T: IntoIterator<Item = Filter>,
1475 {
1476 let subscribe = Subscribe::new_many(topics, properties);
1477 if !subscribe_has_valid_filters(&subscribe) {
1478 return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1479 }
1480
1481 self.try_send_tracked_subscribe(subscribe)
1482 }
1483
1484 pub fn try_subscribe_many_with_properties<T>(
1491 &self,
1492 topics: T,
1493 properties: SubscribeProperties,
1494 ) -> Result<(), ClientError>
1495 where
1496 T: IntoIterator<Item = Filter>,
1497 {
1498 self.handle_try_subscribe_many(topics, Some(properties))
1499 }
1500
1501 pub fn try_subscribe_many_with_properties_tracked<T>(
1508 &self,
1509 topics: T,
1510 properties: SubscribeProperties,
1511 ) -> Result<SubscribeNotice, ClientError>
1512 where
1513 T: IntoIterator<Item = Filter>,
1514 {
1515 self.handle_try_subscribe_many_tracked(topics, Some(properties))
1516 }
1517
1518 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1525 where
1526 T: IntoIterator<Item = Filter>,
1527 {
1528 self.handle_try_subscribe_many(topics, None)
1529 }
1530
1531 pub fn try_subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
1538 where
1539 T: IntoIterator<Item = Filter>,
1540 {
1541 self.handle_try_subscribe_many_tracked(topics, None)
1542 }
1543
1544 async fn handle_unsubscribe<S: Into<String>>(
1546 &self,
1547 topic: S,
1548 properties: Option<UnsubscribeProperties>,
1549 ) -> Result<(), ClientError> {
1550 let unsubscribe = Unsubscribe::new(topic, properties);
1551 let request = Request::Unsubscribe(unsubscribe);
1552 self.send_request_async(request).await?;
1553 Ok(())
1554 }
1555
1556 async fn handle_unsubscribe_tracked<S: Into<String>>(
1557 &self,
1558 topic: S,
1559 properties: Option<UnsubscribeProperties>,
1560 ) -> Result<UnsubscribeNotice, ClientError> {
1561 let unsubscribe = Unsubscribe::new(topic, properties);
1562 self.send_tracked_unsubscribe_async(unsubscribe).await
1563 }
1564
1565 pub async fn unsubscribe_with_properties<S: Into<String>>(
1571 &self,
1572 topic: S,
1573 properties: UnsubscribeProperties,
1574 ) -> Result<(), ClientError> {
1575 self.handle_unsubscribe(topic, Some(properties)).await
1576 }
1577
1578 pub async fn unsubscribe_with_properties_tracked<S: Into<String>>(
1584 &self,
1585 topic: S,
1586 properties: UnsubscribeProperties,
1587 ) -> Result<UnsubscribeNotice, ClientError> {
1588 self.handle_unsubscribe_tracked(topic, Some(properties))
1589 .await
1590 }
1591
1592 pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1598 self.handle_unsubscribe(topic, None).await
1599 }
1600
1601 pub async fn unsubscribe_tracked<S: Into<String>>(
1607 &self,
1608 topic: S,
1609 ) -> Result<UnsubscribeNotice, ClientError> {
1610 self.handle_unsubscribe_tracked(topic, None).await
1611 }
1612
1613 fn handle_try_unsubscribe<S: Into<String>>(
1615 &self,
1616 topic: S,
1617 properties: Option<UnsubscribeProperties>,
1618 ) -> Result<(), ClientError> {
1619 let unsubscribe = Unsubscribe::new(topic, properties);
1620 let request = Request::Unsubscribe(unsubscribe);
1621 self.try_send_request(request)?;
1622 Ok(())
1623 }
1624
1625 fn handle_try_unsubscribe_tracked<S: Into<String>>(
1626 &self,
1627 topic: S,
1628 properties: Option<UnsubscribeProperties>,
1629 ) -> Result<UnsubscribeNotice, ClientError> {
1630 let unsubscribe = Unsubscribe::new(topic, properties);
1631 self.try_send_tracked_unsubscribe(unsubscribe)
1632 }
1633
1634 pub fn try_unsubscribe_with_properties<S: Into<String>>(
1641 &self,
1642 topic: S,
1643 properties: UnsubscribeProperties,
1644 ) -> Result<(), ClientError> {
1645 self.handle_try_unsubscribe(topic, Some(properties))
1646 }
1647
1648 pub fn try_unsubscribe_with_properties_tracked<S: Into<String>>(
1655 &self,
1656 topic: S,
1657 properties: UnsubscribeProperties,
1658 ) -> Result<UnsubscribeNotice, ClientError> {
1659 self.handle_try_unsubscribe_tracked(topic, Some(properties))
1660 }
1661
1662 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1669 self.handle_try_unsubscribe(topic, None)
1670 }
1671
1672 pub fn try_unsubscribe_tracked<S: Into<String>>(
1679 &self,
1680 topic: S,
1681 ) -> Result<UnsubscribeNotice, ClientError> {
1682 self.handle_try_unsubscribe_tracked(topic, None)
1683 }
1684
1685 pub async fn disconnect(&self) -> Result<(), ClientError> {
1704 self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1705 .await
1706 }
1707
1708 pub async fn disconnect_with_properties(
1726 &self,
1727 reason: DisconnectReasonCode,
1728 properties: DisconnectProperties,
1729 ) -> Result<(), ClientError> {
1730 self.handle_disconnect(reason, Some(properties)).await
1731 }
1732
1733 pub async fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1756 self.handle_disconnect_with_timeout(
1757 DisconnectReasonCode::NormalDisconnection,
1758 None,
1759 timeout,
1760 )
1761 .await
1762 }
1763
1764 pub async fn disconnect_with_properties_timeout(
1787 &self,
1788 reason: DisconnectReasonCode,
1789 properties: DisconnectProperties,
1790 timeout: Duration,
1791 ) -> Result<(), ClientError> {
1792 self.handle_disconnect_with_timeout(reason, Some(properties), timeout)
1793 .await
1794 }
1795
1796 pub async fn disconnect_now(&self) -> Result<(), ClientError> {
1807 self.handle_disconnect_now(DisconnectReasonCode::NormalDisconnection, None)
1808 .await
1809 }
1810
1811 pub async fn disconnect_now_with_properties(
1822 &self,
1823 reason: DisconnectReasonCode,
1824 properties: DisconnectProperties,
1825 ) -> Result<(), ClientError> {
1826 self.handle_disconnect_now(reason, Some(properties)).await
1827 }
1828
1829 async fn handle_disconnect(
1831 &self,
1832 reason: DisconnectReasonCode,
1833 properties: Option<DisconnectProperties>,
1834 ) -> Result<(), ClientError> {
1835 let request = Self::build_disconnect_request(reason, properties);
1836 self.send_request_async(request).await?;
1837 Ok(())
1838 }
1839
1840 async fn handle_disconnect_with_timeout(
1841 &self,
1842 reason: DisconnectReasonCode,
1843 properties: Option<DisconnectProperties>,
1844 timeout: Duration,
1845 ) -> Result<(), ClientError> {
1846 let disconnect = Self::build_disconnect_packet(reason, properties);
1847 self.send_request_async(Request::DisconnectWithTimeout(disconnect, timeout))
1848 .await?;
1849 Ok(())
1850 }
1851
1852 async fn handle_disconnect_now(
1853 &self,
1854 reason: DisconnectReasonCode,
1855 properties: Option<DisconnectProperties>,
1856 ) -> Result<(), ClientError> {
1857 let disconnect = Self::build_disconnect_packet(reason, properties);
1858 self.send_immediate_disconnect_async(Request::DisconnectNow(disconnect))
1859 .await?;
1860 Ok(())
1861 }
1862
1863 pub fn try_disconnect(&self) -> Result<(), ClientError> {
1882 self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1883 }
1884
1885 pub fn try_disconnect_with_properties(
1903 &self,
1904 reason: DisconnectReasonCode,
1905 properties: DisconnectProperties,
1906 ) -> Result<(), ClientError> {
1907 self.handle_try_disconnect(reason, Some(properties))
1908 }
1909
1910 pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1933 self.handle_try_disconnect_with_timeout(
1934 DisconnectReasonCode::NormalDisconnection,
1935 None,
1936 timeout,
1937 )
1938 }
1939
1940 pub fn try_disconnect_with_properties_timeout(
1963 &self,
1964 reason: DisconnectReasonCode,
1965 properties: DisconnectProperties,
1966 timeout: Duration,
1967 ) -> Result<(), ClientError> {
1968 self.handle_try_disconnect_with_timeout(reason, Some(properties), timeout)
1969 }
1970
1971 pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
1982 self.handle_try_disconnect_now(DisconnectReasonCode::NormalDisconnection, None)
1983 }
1984
1985 pub fn try_disconnect_now_with_properties(
1996 &self,
1997 reason: DisconnectReasonCode,
1998 properties: DisconnectProperties,
1999 ) -> Result<(), ClientError> {
2000 self.handle_try_disconnect_now(reason, Some(properties))
2001 }
2002
2003 fn handle_try_disconnect(
2005 &self,
2006 reason: DisconnectReasonCode,
2007 properties: Option<DisconnectProperties>,
2008 ) -> Result<(), ClientError> {
2009 let request = Self::build_disconnect_request(reason, properties);
2010 self.try_send_request(request)?;
2011 Ok(())
2012 }
2013
2014 fn handle_try_disconnect_with_timeout(
2015 &self,
2016 reason: DisconnectReasonCode,
2017 properties: Option<DisconnectProperties>,
2018 timeout: Duration,
2019 ) -> Result<(), ClientError> {
2020 let disconnect = Self::build_disconnect_packet(reason, properties);
2021 self.try_send_request(Request::DisconnectWithTimeout(disconnect, timeout))?;
2022 Ok(())
2023 }
2024
2025 fn handle_try_disconnect_now(
2026 &self,
2027 reason: DisconnectReasonCode,
2028 properties: Option<DisconnectProperties>,
2029 ) -> Result<(), ClientError> {
2030 let disconnect = Self::build_disconnect_packet(reason, properties);
2031 self.try_send_immediate_disconnect(Request::DisconnectNow(disconnect))?;
2032 Ok(())
2033 }
2034
2035 fn build_disconnect_request(
2037 reason: DisconnectReasonCode,
2038 properties: Option<DisconnectProperties>,
2039 ) -> Request {
2040 Request::Disconnect(Self::build_disconnect_packet(reason, properties))
2041 }
2042
2043 fn build_disconnect_packet(
2044 reason: DisconnectReasonCode,
2045 properties: Option<DisconnectProperties>,
2046 ) -> Disconnect {
2047 properties.map_or_else(
2048 || Disconnect::new(reason),
2049 |p| Disconnect::new_with_properties(reason, p),
2050 )
2051 }
2052}
2053
2054const fn prepare_ack(publish: &Publish) -> Option<ManualAck> {
2055 let ack = match publish.qos {
2056 QoS::AtMostOnce => return None,
2057 QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)),
2058 QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)),
2059 };
2060 Some(ack)
2061}
2062
2063#[derive(Clone)]
2074pub struct Client {
2075 client: AsyncClient,
2076}
2077
2078impl Client {
2079 #[must_use]
2085 pub const fn builder(options: MqttOptions) -> ClientBuilder {
2086 ClientBuilder::new(options)
2087 }
2088
2089 #[must_use]
2094 pub const fn from_sender(request_tx: Sender<Request>) -> Self {
2095 Self {
2096 client: AsyncClient::from_senders(request_tx),
2097 }
2098 }
2099
2100 fn handle_publish<T, P>(
2102 &self,
2103 topic: T,
2104 qos: QoS,
2105 retain: bool,
2106 payload: P,
2107 properties: Option<PublishProperties>,
2108 ) -> Result<(), ClientError>
2109 where
2110 T: Into<PublishTopic>,
2111 P: Into<Bytes>,
2112 {
2113 let (topic, needs_validation) = topic.into().into_string_and_validation();
2114 let invalid_topic = (needs_validation && !valid_topic(&topic))
2115 || empty_topic_without_valid_alias(&topic, properties.as_ref());
2116 let mut publish = Publish::new(topic, qos, payload, properties);
2117 publish.retain = retain;
2118 let request = Request::Publish(publish);
2119
2120 if invalid_topic {
2121 return Err(ClientError::Request(Box::new(request)));
2122 }
2123
2124 self.client.send_request(request)?;
2125 Ok(())
2126 }
2127
2128 pub fn publish_with_properties<T, P>(
2135 &self,
2136 topic: T,
2137 qos: QoS,
2138 retain: bool,
2139 payload: P,
2140 properties: PublishProperties,
2141 ) -> Result<(), ClientError>
2142 where
2143 T: Into<PublishTopic>,
2144 P: Into<Bytes>,
2145 {
2146 self.handle_publish(topic, qos, retain, payload, Some(properties))
2147 }
2148
2149 pub fn publish<T, P>(
2156 &self,
2157 topic: T,
2158 qos: QoS,
2159 retain: bool,
2160 payload: P,
2161 ) -> Result<(), ClientError>
2162 where
2163 T: Into<PublishTopic>,
2164 P: Into<Bytes>,
2165 {
2166 self.handle_publish(topic, qos, retain, payload, None)
2167 }
2168
2169 pub fn try_publish_with_properties<T, P>(
2176 &self,
2177 topic: T,
2178 qos: QoS,
2179 retain: bool,
2180 payload: P,
2181 properties: PublishProperties,
2182 ) -> Result<(), ClientError>
2183 where
2184 T: Into<PublishTopic>,
2185 P: Into<Bytes>,
2186 {
2187 self.client
2188 .try_publish_with_properties(topic, qos, retain, payload, properties)
2189 }
2190
2191 pub fn try_publish<T, P>(
2198 &self,
2199 topic: T,
2200 qos: QoS,
2201 retain: bool,
2202 payload: P,
2203 ) -> Result<(), ClientError>
2204 where
2205 T: Into<PublishTopic>,
2206 P: Into<Bytes>,
2207 {
2208 self.client.try_publish(topic, qos, retain, payload)
2209 }
2210
2211 pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
2215 self.client.prepare_ack(publish)
2216 }
2217
2218 pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
2225 self.client.send_request(ack.into_request())?;
2226 Ok(())
2227 }
2228
2229 pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
2236 self.client.try_manual_ack(ack)?;
2237 Ok(())
2238 }
2239
2240 pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
2248 if let Some(ack) = self.prepare_ack(publish) {
2249 self.manual_ack(ack)?;
2250 }
2251 Ok(())
2252 }
2253
2254 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
2262 if let Some(ack) = self.prepare_ack(publish) {
2263 self.try_manual_ack(ack)?;
2264 }
2265 Ok(())
2266 }
2267
2268 pub fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
2274 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2275 let auth = Request::Auth(auth);
2276 self.client.send_request(auth)?;
2277 Ok(())
2278 }
2279
2280 pub fn reauth_tracked(
2287 &self,
2288 properties: Option<AuthProperties>,
2289 ) -> Result<AuthNotice, ClientError> {
2290 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2291 self.client.send_tracked_auth(auth)
2292 }
2293
2294 pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
2301 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2302 let auth = Request::Auth(auth);
2303 self.client.try_send_request(auth)?;
2304 Ok(())
2305 }
2306
2307 pub fn try_reauth_tracked(
2315 &self,
2316 properties: Option<AuthProperties>,
2317 ) -> Result<AuthNotice, ClientError> {
2318 let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2319 self.client.try_send_tracked_auth(auth)
2320 }
2321
2322 fn handle_subscribe<S: Into<String>>(
2324 &self,
2325 topic: S,
2326 qos: QoS,
2327 properties: Option<SubscribeProperties>,
2328 ) -> Result<(), ClientError> {
2329 let filter = Filter::new(topic, qos);
2330 let subscribe = Subscribe::new(filter, properties);
2331 if !subscribe_has_valid_filters(&subscribe) {
2332 return Err(ClientError::Request(Box::new(subscribe.into())));
2333 }
2334
2335 self.client.send_request(subscribe.into())?;
2336 Ok(())
2337 }
2338
2339 pub fn subscribe_with_properties<S: Into<String>>(
2346 &self,
2347 topic: S,
2348 qos: QoS,
2349 properties: SubscribeProperties,
2350 ) -> Result<(), ClientError> {
2351 self.handle_subscribe(topic, qos, Some(properties))
2352 }
2353
2354 pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
2361 self.handle_subscribe(topic, qos, None)
2362 }
2363
2364 pub fn try_subscribe_with_properties<S: Into<String>>(
2371 &self,
2372 topic: S,
2373 qos: QoS,
2374 properties: SubscribeProperties,
2375 ) -> Result<(), ClientError> {
2376 self.client
2377 .try_subscribe_with_properties(topic, qos, properties)
2378 }
2379
2380 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
2387 self.client.try_subscribe(topic, qos)
2388 }
2389
2390 fn handle_subscribe_many<T>(
2392 &self,
2393 topics: T,
2394 properties: Option<SubscribeProperties>,
2395 ) -> Result<(), ClientError>
2396 where
2397 T: IntoIterator<Item = Filter>,
2398 {
2399 let subscribe = Subscribe::new_many(topics, properties);
2400 if !subscribe_has_valid_filters(&subscribe) {
2401 return Err(ClientError::Request(Box::new(subscribe.into())));
2402 }
2403
2404 self.client.send_request(subscribe.into())?;
2405 Ok(())
2406 }
2407
2408 pub fn subscribe_many_with_properties<T>(
2415 &self,
2416 topics: T,
2417 properties: SubscribeProperties,
2418 ) -> Result<(), ClientError>
2419 where
2420 T: IntoIterator<Item = Filter>,
2421 {
2422 self.handle_subscribe_many(topics, Some(properties))
2423 }
2424
2425 pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
2432 where
2433 T: IntoIterator<Item = Filter>,
2434 {
2435 self.handle_subscribe_many(topics, None)
2436 }
2437
2438 pub fn try_subscribe_many_with_properties<T>(
2445 &self,
2446 topics: T,
2447 properties: SubscribeProperties,
2448 ) -> Result<(), ClientError>
2449 where
2450 T: IntoIterator<Item = Filter>,
2451 {
2452 self.client
2453 .try_subscribe_many_with_properties(topics, properties)
2454 }
2455
2456 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
2463 where
2464 T: IntoIterator<Item = Filter>,
2465 {
2466 self.client.try_subscribe_many(topics)
2467 }
2468
2469 fn handle_unsubscribe<S: Into<String>>(
2471 &self,
2472 topic: S,
2473 properties: Option<UnsubscribeProperties>,
2474 ) -> Result<(), ClientError> {
2475 let unsubscribe = Unsubscribe::new(topic, properties);
2476 let request = Request::Unsubscribe(unsubscribe);
2477 self.client.send_request(request)?;
2478 Ok(())
2479 }
2480
2481 pub fn unsubscribe_with_properties<S: Into<String>>(
2487 &self,
2488 topic: S,
2489 properties: UnsubscribeProperties,
2490 ) -> Result<(), ClientError> {
2491 self.handle_unsubscribe(topic, Some(properties))
2492 }
2493
2494 pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
2500 self.handle_unsubscribe(topic, None)
2501 }
2502
2503 pub fn try_unsubscribe_with_properties<S: Into<String>>(
2510 &self,
2511 topic: S,
2512 properties: UnsubscribeProperties,
2513 ) -> Result<(), ClientError> {
2514 self.client
2515 .try_unsubscribe_with_properties(topic, properties)
2516 }
2517
2518 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
2525 self.client.try_unsubscribe(topic)
2526 }
2527
2528 pub fn disconnect(&self) -> Result<(), ClientError> {
2546 self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
2547 }
2548
2549 pub fn disconnect_with_properties(
2567 &self,
2568 reason: DisconnectReasonCode,
2569 properties: DisconnectProperties,
2570 ) -> Result<(), ClientError> {
2571 self.handle_disconnect(reason, Some(properties))
2572 }
2573
2574 pub fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
2597 self.handle_disconnect_with_timeout(
2598 DisconnectReasonCode::NormalDisconnection,
2599 None,
2600 timeout,
2601 )
2602 }
2603
2604 pub fn disconnect_with_properties_timeout(
2627 &self,
2628 reason: DisconnectReasonCode,
2629 properties: DisconnectProperties,
2630 timeout: Duration,
2631 ) -> Result<(), ClientError> {
2632 self.handle_disconnect_with_timeout(reason, Some(properties), timeout)
2633 }
2634
2635 pub fn disconnect_now(&self) -> Result<(), ClientError> {
2646 self.handle_disconnect_now(DisconnectReasonCode::NormalDisconnection, None)
2647 }
2648
2649 pub fn disconnect_now_with_properties(
2660 &self,
2661 reason: DisconnectReasonCode,
2662 properties: DisconnectProperties,
2663 ) -> Result<(), ClientError> {
2664 self.handle_disconnect_now(reason, Some(properties))
2665 }
2666
2667 fn handle_disconnect(
2668 &self,
2669 reason: DisconnectReasonCode,
2670 properties: Option<DisconnectProperties>,
2671 ) -> Result<(), ClientError> {
2672 let request = AsyncClient::build_disconnect_request(reason, properties);
2673 self.client.send_request(request)?;
2674 Ok(())
2675 }
2676
2677 fn handle_disconnect_with_timeout(
2678 &self,
2679 reason: DisconnectReasonCode,
2680 properties: Option<DisconnectProperties>,
2681 timeout: Duration,
2682 ) -> Result<(), ClientError> {
2683 let disconnect = AsyncClient::build_disconnect_packet(reason, properties);
2684 self.client
2685 .send_request(Request::DisconnectWithTimeout(disconnect, timeout))?;
2686 Ok(())
2687 }
2688
2689 fn handle_disconnect_now(
2690 &self,
2691 reason: DisconnectReasonCode,
2692 properties: Option<DisconnectProperties>,
2693 ) -> Result<(), ClientError> {
2694 let disconnect = AsyncClient::build_disconnect_packet(reason, properties);
2695 self.client
2696 .send_immediate_disconnect(Request::DisconnectNow(disconnect))?;
2697 Ok(())
2698 }
2699
2700 pub fn try_disconnect(&self) -> Result<(), ClientError> {
2718 self.client.try_disconnect()
2719 }
2720
2721 pub fn try_disconnect_with_properties(
2739 &self,
2740 reason: DisconnectReasonCode,
2741 properties: DisconnectProperties,
2742 ) -> Result<(), ClientError> {
2743 self.client.handle_try_disconnect(reason, Some(properties))
2744 }
2745
2746 pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
2769 self.client.try_disconnect_with_timeout(timeout)
2770 }
2771
2772 pub fn try_disconnect_with_properties_timeout(
2795 &self,
2796 reason: DisconnectReasonCode,
2797 properties: DisconnectProperties,
2798 timeout: Duration,
2799 ) -> Result<(), ClientError> {
2800 self.client
2801 .handle_try_disconnect_with_timeout(reason, Some(properties), timeout)
2802 }
2803
2804 pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
2815 self.client.try_disconnect_now()
2816 }
2817
2818 pub fn try_disconnect_now_with_properties(
2829 &self,
2830 reason: DisconnectReasonCode,
2831 properties: DisconnectProperties,
2832 ) -> Result<(), ClientError> {
2833 self.client
2834 .handle_try_disconnect_now(reason, Some(properties))
2835 }
2836}
2837
2838#[must_use]
2839fn empty_topic_without_valid_alias(topic: &str, properties: Option<&PublishProperties>) -> bool {
2840 topic.is_empty()
2841 && properties
2842 .and_then(|props| props.topic_alias)
2843 .unwrap_or_default()
2844 == 0
2845}
2846
2847#[must_use]
2848fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
2849 !subscribe.filters.is_empty()
2850 && subscribe
2851 .filters
2852 .iter()
2853 .all(|filter| valid_filter(&filter.path))
2854}
2855
2856#[derive(Debug, Eq, PartialEq)]
2858pub struct RecvError;
2859
2860#[derive(Debug, Eq, PartialEq)]
2862pub enum TryRecvError {
2863 Disconnected,
2865 Empty,
2867}
2868
2869#[derive(Debug, Eq, PartialEq)]
2871pub enum RecvTimeoutError {
2872 Disconnected,
2874 Timeout,
2876}
2877
2878pub struct Connection {
2880 pub eventloop: EventLoop,
2881 runtime: Runtime,
2882}
2883impl Connection {
2884 const fn new(eventloop: EventLoop, runtime: Runtime) -> Self {
2885 Self { eventloop, runtime }
2886 }
2887
2888 #[must_use = "Connection should be iterated over a loop to make progress"]
2895 pub const fn iter(&mut self) -> Iter<'_> {
2896 Iter { connection: self }
2897 }
2898
2899 pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
2908 let f = self.eventloop.poll();
2909 let event = self.runtime.block_on(f);
2910
2911 resolve_event(event).ok_or(RecvError)
2912 }
2913
2914 pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
2924 let f = self.eventloop.poll();
2925 let _guard = self.runtime.enter();
2928 let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
2929
2930 resolve_event(event).ok_or(TryRecvError::Disconnected)
2931 }
2932
2933 pub fn recv_timeout(
2944 &mut self,
2945 duration: Duration,
2946 ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
2947 let f = self.eventloop.poll();
2948 let event = self
2949 .runtime
2950 .block_on(async { timeout(duration, f).await })
2951 .map_err(|_| RecvTimeoutError::Timeout)?;
2952
2953 resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
2954 }
2955}
2956
2957fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
2958 match event {
2959 Ok(v) => Some(Ok(v)),
2960 Err(ConnectionError::RequestsDone) => {
2962 trace!("Done with requests");
2963 None
2964 }
2965 Err(e) => Some(Err(e)),
2966 }
2967}
2968
2969pub struct Iter<'a> {
2971 connection: &'a mut Connection,
2972}
2973
2974impl Iterator for Iter<'_> {
2975 type Item = Result<Event, ConnectionError>;
2976
2977 fn next(&mut self) -> Option<Self::Item> {
2978 self.connection.recv().ok()
2979 }
2980}
2981
2982#[cfg(test)]
2983mod test {
2984 use crate::mqttbytes::v5::{
2985 LastWill, PubAckProperties, PubAckReason, PubRecProperties, PubRecReason,
2986 };
2987
2988 use super::*;
2989
2990 #[test]
2991 fn calling_iter_twice_on_connection_shouldnt_panic() {
2992 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
2993 let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
2994 mqttoptions.set_keep_alive(5).set_last_will(will);
2995
2996 let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
2997 let _ = connection.iter();
2998 let _ = connection.iter();
2999 }
3000
3001 #[test]
3002 fn builder_uses_options_request_channel_capacity_by_default() {
3003 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
3004 mqttoptions.set_request_channel_capacity(1);
3005 let builder: AsyncClientBuilder = AsyncClient::builder(mqttoptions);
3006 let (client, _eventloop) = builder.build();
3007
3008 client
3009 .try_publish("hello/world", QoS::AtMostOnce, false, "one")
3010 .expect("first request should fit configured capacity");
3011 assert!(matches!(
3012 client.try_publish("hello/world", QoS::AtMostOnce, false, "two"),
3013 Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
3014 ));
3015 }
3016
3017 #[test]
3018 fn sync_and_async_entry_points_return_distinct_builder_types() {
3019 let sync_builder = Client::builder(MqttOptions::new("test-sync", "localhost"));
3020 let async_builder = AsyncClient::builder(MqttOptions::new("test-async", "localhost"));
3021
3022 let _: ClientBuilder = sync_builder;
3023 let _: AsyncClientBuilder = async_builder;
3024 }
3025
3026 #[test]
3027 fn builder_capacity_overrides_options_request_channel_capacity() {
3028 let mut mqttoptions = MqttOptions::new("test-1", "localhost");
3029 mqttoptions.set_request_channel_capacity(1);
3030 let (client, _eventloop) = Client::builder(mqttoptions).capacity(2).build();
3031
3032 client
3033 .try_publish("hello/world", QoS::AtMostOnce, false, "one")
3034 .expect("first request should fit overridden capacity");
3035 client
3036 .try_publish("hello/world", QoS::AtMostOnce, false, "two")
3037 .expect("second request should fit overridden capacity");
3038 assert!(matches!(
3039 client.try_publish("hello/world", QoS::AtMostOnce, false, "three"),
3040 Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
3041 ));
3042 }
3043
3044 #[test]
3045 fn builder_capacity_zero_is_bounded_rendezvous() {
3046 let mqttoptions = MqttOptions::new("test-1", "localhost");
3047 let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(0).build();
3048
3049 assert!(matches!(
3050 client.try_publish("hello/world", QoS::AtMostOnce, false, "one"),
3051 Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
3052 ));
3053 }
3054
3055 #[test]
3056 fn unbounded_builder_allows_try_publish_without_polling() {
3057 let mqttoptions = MqttOptions::new("test-1", "localhost");
3058 let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
3059
3060 for i in 0..128 {
3061 client
3062 .try_publish("hello/world", QoS::AtMostOnce, false, vec![i])
3063 .expect("unbounded channel should accept requests without polling");
3064 }
3065 }
3066
3067 #[tokio::test]
3068 async fn bounded_publish_blocks_when_channel_is_full_without_polling() {
3069 let mqttoptions = MqttOptions::new("test-1", "localhost");
3070 let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(1).build();
3071
3072 client
3073 .publish("hello/world", QoS::AtMostOnce, false, "one")
3074 .await
3075 .expect("first request should fit bounded channel");
3076
3077 let result = tokio::time::timeout(
3078 std::time::Duration::from_millis(25),
3079 client.publish("hello/world", QoS::AtMostOnce, false, "two"),
3080 )
3081 .await;
3082 assert!(result.is_err());
3083 }
3084
3085 #[tokio::test]
3086 async fn unbounded_publish_completes_without_polling() {
3087 let mqttoptions = MqttOptions::new("test-1", "localhost");
3088 let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
3089
3090 for i in 0..128 {
3091 client
3092 .publish("hello/world", QoS::AtMostOnce, false, vec![i])
3093 .await
3094 .expect("unbounded channel should accept requests without polling");
3095 }
3096 }
3097
3098 #[test]
3099 fn should_be_able_to_build_test_client_from_channel() {
3100 let (tx, rx) = flume::bounded(1);
3101 let client = Client::from_sender(tx);
3102 client
3103 .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
3104 .expect("Should be able to publish");
3105 let _ = rx.try_recv().expect("Should have message");
3106 }
3107
3108 #[test]
3109 fn prepare_ack_maps_qos_to_manual_ack_packets_v5() {
3110 let (tx, _) = flume::bounded(1);
3111 let client = Client::from_sender(tx);
3112
3113 let qos0 = Publish::new("hello/world", QoS::AtMostOnce, vec![1], None);
3114 assert!(client.prepare_ack(&qos0).is_none());
3115
3116 let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
3117 qos1.pkid = 7;
3118 match client.prepare_ack(&qos1) {
3119 Some(ManualAck::PubAck(ack)) => assert_eq!(ack.pkid, 7),
3120 ack => panic!("expected QoS1 PubAck, got {ack:?}"),
3121 }
3122
3123 let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
3124 qos2.pkid = 9;
3125 match client.prepare_ack(&qos2) {
3126 Some(ManualAck::PubRec(ack)) => assert_eq!(ack.pkid, 9),
3127 ack => panic!("expected QoS2 PubRec, got {ack:?}"),
3128 }
3129 }
3130
3131 #[test]
3132 fn manual_ack_sends_custom_puback_reason_and_properties() {
3133 let (tx, rx) = flume::bounded(1);
3134 let client = Client::from_sender(tx);
3135
3136 let expected_properties = PubAckProperties {
3137 reason_string: Some("no downstream subscribers".to_owned()),
3138 user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
3139 };
3140 let mut ack = PubAck::new(41, None);
3141 ack.reason = PubAckReason::NoMatchingSubscribers;
3142 ack.properties = Some(expected_properties.clone());
3143
3144 client
3145 .manual_ack(ManualAck::PubAck(ack))
3146 .expect("manual_ack should send request");
3147
3148 let request = rx.try_recv().expect("Should have ack request");
3149 match request {
3150 Request::PubAck(ack) => {
3151 assert_eq!(ack.pkid, 41);
3152 assert_eq!(ack.reason, PubAckReason::NoMatchingSubscribers);
3153 assert_eq!(ack.properties, Some(expected_properties));
3154 }
3155 request => panic!("Expected PubAck request, got {request:?}"),
3156 }
3157 }
3158
3159 #[test]
3160 fn try_manual_ack_sends_custom_pubrec_reason_and_properties() {
3161 let (tx, rx) = flume::bounded(1);
3162 let client = Client::from_sender(tx);
3163
3164 let expected_properties = PubRecProperties {
3165 reason_string: Some("queued for qos2 flow".to_owned()),
3166 user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
3167 };
3168 let mut ack = PubRec::new(52, None);
3169 ack.reason = PubRecReason::ImplementationSpecificError;
3170 ack.properties = Some(expected_properties.clone());
3171
3172 client
3173 .try_manual_ack(ManualAck::PubRec(ack))
3174 .expect("try_manual_ack should send request");
3175
3176 let request = rx.try_recv().expect("Should have ack request");
3177 match request {
3178 Request::PubRec(ack) => {
3179 assert_eq!(ack.pkid, 52);
3180 assert_eq!(ack.reason, PubRecReason::ImplementationSpecificError);
3181 assert_eq!(ack.properties, Some(expected_properties));
3182 }
3183 request => panic!("Expected PubRec request, got {request:?}"),
3184 }
3185 }
3186
3187 #[test]
3188 fn ack_and_try_ack_send_default_success_packets_v5() {
3189 let (tx, rx) = flume::bounded(2);
3190 let client = Client::from_sender(tx);
3191
3192 let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
3193 qos1.pkid = 11;
3194 client.ack(&qos1).expect("ack should send PubAck");
3195
3196 let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
3197 qos2.pkid = 13;
3198 client
3199 .try_ack(&qos2)
3200 .expect("try_ack should send PubRec request");
3201
3202 let first = rx.try_recv().expect("Should receive first ack request");
3203 match first {
3204 Request::PubAck(ack) => {
3205 assert_eq!(ack.pkid, 11);
3206 assert_eq!(ack.reason, PubAckReason::Success);
3207 assert_eq!(ack.properties, None);
3208 }
3209 request => panic!("Expected PubAck request, got {request:?}"),
3210 }
3211
3212 let second = rx.try_recv().expect("Should receive second ack request");
3213 match second {
3214 Request::PubRec(ack) => {
3215 assert_eq!(ack.pkid, 13);
3216 assert_eq!(ack.reason, PubRecReason::Success);
3217 assert_eq!(ack.properties, None);
3218 }
3219 request => panic!("Expected PubRec request, got {request:?}"),
3220 }
3221 }
3222
3223 #[test]
3224 fn test_reauth() {
3225 let (client, mut connection) = Client::builder(MqttOptions::new("test-1", "localhost"))
3226 .capacity(10)
3227 .build();
3228 let props = AuthProperties {
3229 method: Some("test".to_string()),
3230 data: Some(Bytes::from("test")),
3231 reason: None,
3232 user_properties: vec![],
3233 };
3234 client
3235 .reauth(Some(props.clone()))
3236 .expect("Should be able to reauth");
3237 let _ = connection.iter().next().expect("Should have event");
3238
3239 client
3240 .try_reauth(Some(props))
3241 .expect("Should be able to reauth");
3242 let _ = connection.iter().next().expect("Should have event");
3243 }
3244
3245 #[test]
3246 fn can_publish_with_validated_topic() {
3247 let (tx, rx) = flume::bounded(1);
3248 let client = Client::from_sender(tx);
3249 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3250 client
3251 .publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
3252 .expect("Should be able to publish");
3253 let _ = rx.try_recv().expect("Should have message");
3254 }
3255
3256 #[test]
3257 fn publish_accepts_borrowed_string_topic() {
3258 let (tx, rx) = flume::bounded(2);
3259 let client = Client::from_sender(tx);
3260 let topic = "hello/world".to_string();
3261 client
3262 .publish(&topic, QoS::ExactlyOnce, false, "good bye")
3263 .expect("Should be able to publish");
3264 client
3265 .try_publish(&topic, QoS::ExactlyOnce, false, "good bye")
3266 .expect("Should be able to publish");
3267 let _ = rx.try_recv().expect("Should have message");
3268 let _ = rx.try_recv().expect("Should have message");
3269 }
3270
3271 #[test]
3272 fn publish_accepts_cow_topic_variants() {
3273 let (tx, rx) = flume::bounded(2);
3274 let client = Client::from_sender(tx);
3275 client
3276 .publish(
3277 std::borrow::Cow::Borrowed("hello/world"),
3278 QoS::ExactlyOnce,
3279 false,
3280 "good bye",
3281 )
3282 .expect("Should be able to publish");
3283 client
3284 .try_publish(
3285 std::borrow::Cow::Owned("hello/world".to_owned()),
3286 QoS::ExactlyOnce,
3287 false,
3288 "good bye",
3289 )
3290 .expect("Should be able to publish");
3291 let _ = rx.try_recv().expect("Should have message");
3292 let _ = rx.try_recv().expect("Should have message");
3293 }
3294
3295 #[test]
3296 fn publishing_invalid_cow_topic_fails() {
3297 let (tx, _) = flume::bounded(1);
3298 let client = Client::from_sender(tx);
3299 let err = client
3300 .publish(
3301 std::borrow::Cow::Borrowed("a/+/b"),
3302 QoS::ExactlyOnce,
3303 false,
3304 "good bye",
3305 )
3306 .expect_err("Invalid publish topic should fail");
3307 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3308 }
3309
3310 #[test]
3311 fn validated_topic_ergonomics() {
3312 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3313 let valid_topic_can_be_cloned = valid_topic.clone();
3314 assert_eq!(valid_topic, valid_topic_can_be_cloned);
3316 }
3317
3318 #[test]
3319 fn creating_invalid_validated_topic_fails() {
3320 assert_eq!(
3321 ValidatedTopic::new("a/+/b"),
3322 Err(InvalidTopic("a/+/b".to_string()))
3323 );
3324 }
3325
3326 #[test]
3327 fn publish_with_properties_accepts_validated_topic() {
3328 let (tx, rx) = flume::bounded(1);
3329 let client = Client::from_sender(tx);
3330 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3331 client
3332 .publish_with_properties(
3333 valid_topic,
3334 QoS::ExactlyOnce,
3335 false,
3336 "good bye",
3337 PublishProperties::default(),
3338 )
3339 .expect("Should be able to publish");
3340 let _ = rx.try_recv().expect("Should have message");
3341 }
3342
3343 #[test]
3344 fn publish_with_properties_empty_topic_requires_nonzero_alias() {
3345 let (tx, _) = flume::bounded(1);
3346 let client = Client::from_sender(tx);
3347
3348 let err = client
3349 .publish_with_properties(
3350 "",
3351 QoS::AtMostOnce,
3352 false,
3353 "good bye",
3354 PublishProperties::default(),
3355 )
3356 .expect_err("Empty topic without topic alias should fail");
3357 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3358
3359 let err = client
3360 .publish_with_properties(
3361 "",
3362 QoS::AtMostOnce,
3363 false,
3364 "good bye",
3365 PublishProperties {
3366 topic_alias: Some(0),
3367 ..Default::default()
3368 },
3369 )
3370 .expect_err("Empty topic with topic alias 0 should fail");
3371 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3372 }
3373
3374 #[test]
3375 fn publish_with_properties_empty_topic_accepts_nonzero_alias() {
3376 let (tx, rx) = flume::bounded(1);
3377 let client = Client::from_sender(tx);
3378
3379 client
3380 .publish_with_properties(
3381 "",
3382 QoS::AtMostOnce,
3383 false,
3384 "good bye",
3385 PublishProperties {
3386 topic_alias: Some(1),
3387 ..Default::default()
3388 },
3389 )
3390 .expect("Empty topic with non-zero topic alias should be accepted");
3391
3392 let request = rx.try_recv().expect("Should have message");
3393 match request {
3394 Request::Publish(publish) => {
3395 assert!(publish.topic.is_empty());
3396 assert_eq!(
3397 publish
3398 .properties
3399 .as_ref()
3400 .and_then(|properties| properties.topic_alias),
3401 Some(1)
3402 );
3403 }
3404 request => panic!("Expected Publish request, got {request:?}"),
3405 }
3406 }
3407
3408 #[test]
3409 fn try_publish_accepts_validated_topic() {
3410 let (tx, rx) = flume::bounded(1);
3411 let client = Client::from_sender(tx);
3412 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3413 client
3414 .try_publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
3415 .expect("Should be able to publish");
3416 let _ = rx.try_recv().expect("Should have message");
3417 }
3418
3419 #[test]
3420 fn try_publish_with_properties_accepts_validated_topic() {
3421 let (tx, rx) = flume::bounded(1);
3422 let client = Client::from_sender(tx);
3423 let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3424 client
3425 .try_publish_with_properties(
3426 valid_topic,
3427 QoS::ExactlyOnce,
3428 false,
3429 "good bye",
3430 PublishProperties::default(),
3431 )
3432 .expect("Should be able to publish");
3433 let _ = rx.try_recv().expect("Should have message");
3434 }
3435
3436 #[test]
3437 fn try_publish_with_properties_empty_topic_requires_nonzero_alias() {
3438 let (tx, _) = flume::bounded(1);
3439 let client = Client::from_sender(tx);
3440
3441 let err = client
3442 .try_publish_with_properties(
3443 "",
3444 QoS::AtMostOnce,
3445 false,
3446 "good bye",
3447 PublishProperties::default(),
3448 )
3449 .expect_err("Empty topic without topic alias should fail");
3450 assert!(matches!(err, ClientError::TryRequest(req) if matches!(*req, Request::Publish(_))));
3451 }
3452
3453 #[test]
3454 fn publishing_invalid_raw_topic_fails() {
3455 let (tx, _) = flume::bounded(1);
3456 let client = Client::from_sender(tx);
3457 let err = client
3458 .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
3459 .expect_err("Invalid publish topic should fail");
3460 assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3461 }
3462
3463 #[test]
3464 fn async_publish_paths_accept_validated_topic() {
3465 let (tx, rx) = flume::bounded(4);
3466 let client = AsyncClient::from_senders(tx);
3467 let runtime = runtime::Builder::new_current_thread()
3468 .enable_all()
3469 .build()
3470 .unwrap();
3471
3472 runtime.block_on(async {
3473 client
3474 .publish(
3475 ValidatedTopic::new("hello/world").unwrap(),
3476 QoS::ExactlyOnce,
3477 false,
3478 "good bye",
3479 )
3480 .await
3481 .expect("Should be able to publish");
3482
3483 client
3484 .publish_with_properties(
3485 ValidatedTopic::new("hello/world").unwrap(),
3486 QoS::ExactlyOnce,
3487 false,
3488 "good bye",
3489 PublishProperties::default(),
3490 )
3491 .await
3492 .expect("Should be able to publish");
3493
3494 client
3495 .publish_bytes(
3496 ValidatedTopic::new("hello/world").unwrap(),
3497 QoS::ExactlyOnce,
3498 false,
3499 Bytes::from_static(b"good bye"),
3500 )
3501 .await
3502 .expect("Should be able to publish");
3503
3504 client
3505 .publish_bytes_with_properties(
3506 ValidatedTopic::new("hello/world").unwrap(),
3507 QoS::ExactlyOnce,
3508 false,
3509 Bytes::from_static(b"good bye"),
3510 PublishProperties::default(),
3511 )
3512 .await
3513 .expect("Should be able to publish");
3514 });
3515
3516 let _ = rx.try_recv().expect("Should have message");
3517 let _ = rx.try_recv().expect("Should have message");
3518 let _ = rx.try_recv().expect("Should have message");
3519 let _ = rx.try_recv().expect("Should have message");
3520 }
3521
3522 #[test]
3523 fn async_try_publish_paths_accept_validated_topic() {
3524 let (tx, rx) = flume::bounded(4);
3525 let client = AsyncClient::from_senders(tx);
3526
3527 client
3528 .try_publish(
3529 ValidatedTopic::new("hello/world").unwrap(),
3530 QoS::ExactlyOnce,
3531 false,
3532 "good bye",
3533 )
3534 .expect("Should be able to publish");
3535
3536 client
3537 .try_publish_with_properties(
3538 ValidatedTopic::new("hello/world").unwrap(),
3539 QoS::ExactlyOnce,
3540 false,
3541 "good bye",
3542 PublishProperties::default(),
3543 )
3544 .expect("Should be able to publish");
3545
3546 let _ = rx.try_recv().expect("Should have message");
3547 let _ = rx.try_recv().expect("Should have message");
3548 }
3549
3550 #[test]
3551 fn async_publishing_invalid_raw_topic_fails() {
3552 let (tx, _) = flume::bounded(1);
3553 let client = AsyncClient::from_senders(tx);
3554 let runtime = runtime::Builder::new_current_thread()
3555 .enable_all()
3556 .build()
3557 .unwrap();
3558
3559 runtime.block_on(async {
3560 let err = client
3561 .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
3562 .await
3563 .expect_err("Invalid publish topic should fail");
3564 assert!(
3565 matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
3566 );
3567
3568 let err = client
3569 .publish_bytes(
3570 "a/+/b",
3571 QoS::ExactlyOnce,
3572 false,
3573 Bytes::from_static(b"good bye"),
3574 )
3575 .await
3576 .expect_err("Invalid publish topic should fail");
3577 assert!(
3578 matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
3579 );
3580
3581 let err = client
3582 .publish_with_properties(
3583 "",
3584 QoS::AtMostOnce,
3585 false,
3586 "good bye",
3587 PublishProperties::default(),
3588 )
3589 .await
3590 .expect_err("Empty topic without topic alias should fail");
3591 assert!(
3592 matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
3593 );
3594 });
3595 }
3596
3597 #[test]
3598 fn disconnect_with_properties_builds_disconnect_request() {
3599 let (tx, rx) = flume::bounded(1);
3600 let client = Client::from_sender(tx);
3601 let properties = DisconnectProperties {
3602 session_expiry_interval: Some(120),
3603 reason_string: Some("closing".to_string()),
3604 user_properties: vec![("source".to_string(), "test".to_string())],
3605 server_reference: Some("backup-broker".to_string()),
3606 };
3607
3608 client
3609 .disconnect_with_properties(
3610 DisconnectReasonCode::ImplementationSpecificError,
3611 properties.clone(),
3612 )
3613 .expect("disconnect_with_properties should enqueue request");
3614
3615 let request = rx.try_recv().expect("Should have disconnect request");
3616 match request {
3617 Request::Disconnect(disconnect) => {
3618 assert_eq!(
3619 disconnect.reason_code,
3620 DisconnectReasonCode::ImplementationSpecificError
3621 );
3622 assert_eq!(disconnect.properties, Some(properties));
3623 }
3624 request => panic!("Expected disconnect request, got {request:?}"),
3625 }
3626 }
3627
3628 #[test]
3629 fn try_disconnect_with_properties_builds_disconnect_request() {
3630 let (tx, rx) = flume::bounded(1);
3631 let client = Client::from_sender(tx);
3632 let properties = DisconnectProperties {
3633 session_expiry_interval: Some(360),
3634 reason_string: Some("maintenance".to_string()),
3635 user_properties: vec![("env".to_string(), "test".to_string())],
3636 server_reference: None,
3637 };
3638
3639 client
3640 .try_disconnect_with_properties(
3641 DisconnectReasonCode::ServerShuttingDown,
3642 properties.clone(),
3643 )
3644 .expect("try_disconnect_with_properties should enqueue request");
3645
3646 let request = rx.try_recv().expect("Should have disconnect request");
3647 match request {
3648 Request::Disconnect(disconnect) => {
3649 assert_eq!(
3650 disconnect.reason_code,
3651 DisconnectReasonCode::ServerShuttingDown
3652 );
3653 assert_eq!(disconnect.properties, Some(properties));
3654 }
3655 request => panic!("Expected disconnect request, got {request:?}"),
3656 }
3657 }
3658
3659 #[test]
3660 fn async_disconnect_with_properties_builds_disconnect_request() {
3661 let (tx, rx) = flume::bounded(1);
3662 let client = AsyncClient::from_senders(tx);
3663 let runtime = runtime::Builder::new_current_thread()
3664 .enable_all()
3665 .build()
3666 .unwrap();
3667 let properties = DisconnectProperties {
3668 session_expiry_interval: Some(42),
3669 reason_string: Some("done".to_string()),
3670 user_properties: vec![("k".to_string(), "v".to_string())],
3671 server_reference: Some("fallback".to_string()),
3672 };
3673
3674 runtime.block_on(async {
3675 client
3676 .disconnect_with_properties(
3677 DisconnectReasonCode::UseAnotherServer,
3678 properties.clone(),
3679 )
3680 .await
3681 .expect("disconnect_with_properties should enqueue request");
3682 });
3683
3684 let request = rx.try_recv().expect("Should have disconnect request");
3685 match request {
3686 Request::Disconnect(disconnect) => {
3687 assert_eq!(
3688 disconnect.reason_code,
3689 DisconnectReasonCode::UseAnotherServer
3690 );
3691 assert_eq!(disconnect.properties, Some(properties));
3692 }
3693 request => panic!("Expected disconnect request, got {request:?}"),
3694 }
3695 }
3696
3697 #[test]
3698 fn async_try_disconnect_with_properties_builds_disconnect_request() {
3699 let (tx, rx) = flume::bounded(1);
3700 let client = AsyncClient::from_senders(tx);
3701 let properties = DisconnectProperties {
3702 session_expiry_interval: Some(7),
3703 reason_string: Some("bye".to_string()),
3704 user_properties: vec![("actor".to_string(), "test".to_string())],
3705 server_reference: None,
3706 };
3707
3708 client
3709 .try_disconnect_with_properties(
3710 DisconnectReasonCode::AdministrativeAction,
3711 properties.clone(),
3712 )
3713 .expect("try_disconnect_with_properties should enqueue request");
3714
3715 let request = rx.try_recv().expect("Should have disconnect request");
3716 match request {
3717 Request::Disconnect(disconnect) => {
3718 assert_eq!(
3719 disconnect.reason_code,
3720 DisconnectReasonCode::AdministrativeAction
3721 );
3722 assert_eq!(disconnect.properties, Some(properties));
3723 }
3724 request => panic!("Expected disconnect request, got {request:?}"),
3725 }
3726 }
3727
3728 #[test]
3729 fn tracked_publish_requires_tracking_channel() {
3730 let (tx, _) = flume::bounded(2);
3731 let client = AsyncClient::from_senders(tx);
3732 let runtime = runtime::Builder::new_current_thread()
3733 .enable_all()
3734 .build()
3735 .unwrap();
3736
3737 runtime.block_on(async {
3738 let err = client
3739 .publish_tracked("hello/world", QoS::AtLeastOnce, false, "good bye")
3740 .await
3741 .expect_err("tracked publish should fail without tracked channel");
3742 assert!(matches!(err, ClientError::TrackingUnavailable));
3743
3744 let err = client
3745 .publish_bytes_tracked(
3746 "hello/world",
3747 QoS::AtLeastOnce,
3748 false,
3749 Bytes::from_static(b"good bye"),
3750 )
3751 .await
3752 .expect_err("tracked publish bytes should fail without tracked channel");
3753 assert!(matches!(err, ClientError::TrackingUnavailable));
3754
3755 let err = client
3756 .subscribe_tracked("hello/world", QoS::AtLeastOnce)
3757 .await
3758 .expect_err("tracked subscribe should fail without tracked channel");
3759 assert!(matches!(err, ClientError::TrackingUnavailable));
3760
3761 let err = client
3762 .subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
3763 .await
3764 .expect_err("tracked subscribe many should fail without tracked channel");
3765 assert!(matches!(err, ClientError::TrackingUnavailable));
3766
3767 let err = client
3768 .unsubscribe_tracked("hello/world")
3769 .await
3770 .expect_err("tracked unsubscribe should fail without tracked channel");
3771 assert!(matches!(err, ClientError::TrackingUnavailable));
3772 });
3773
3774 let err = client
3775 .try_subscribe_tracked("hello/world", QoS::AtLeastOnce)
3776 .expect_err("tracked try_subscribe should fail without tracked channel");
3777 assert!(matches!(err, ClientError::TrackingUnavailable));
3778
3779 let err = client
3780 .try_subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
3781 .expect_err("tracked try_subscribe_many should fail without tracked channel");
3782 assert!(matches!(err, ClientError::TrackingUnavailable));
3783
3784 let err = client
3785 .try_unsubscribe_tracked("hello/world")
3786 .expect_err("tracked try_unsubscribe should fail without tracked channel");
3787 assert!(matches!(err, ClientError::TrackingUnavailable));
3788 }
3789
3790 #[test]
3791 fn tracked_unsubscribe_uses_control_request_channel() {
3792 let (requests, requests_rx) = flume::bounded(1);
3793 let (control_requests, control_requests_rx) = flume::bounded(1);
3794 let (immediate_disconnect, _immediate_disconnect_rx) = flume::unbounded();
3795 let client = AsyncClient {
3796 request_tx: RequestSender::WithNotice {
3797 requests,
3798 control_requests,
3799 immediate_disconnect,
3800 },
3801 };
3802 let runtime = runtime::Builder::new_current_thread()
3803 .enable_all()
3804 .build()
3805 .unwrap();
3806
3807 runtime
3808 .block_on(client.unsubscribe_tracked("hello/world"))
3809 .expect("tracked unsubscribe should enqueue");
3810
3811 assert!(requests_rx.is_empty());
3812 let envelope = control_requests_rx
3813 .try_recv()
3814 .expect("tracked unsubscribe should use control channel");
3815 assert!(matches!(envelope.into_parts().0, Request::Unsubscribe(_)));
3816 }
3817
3818 #[test]
3819 fn tracked_auth_uses_control_request_channel() {
3820 let (requests, requests_rx) = flume::bounded(1);
3821 let (control_requests, control_requests_rx) = flume::bounded(1);
3822 let (immediate_disconnect, _immediate_disconnect_rx) = flume::unbounded();
3823 let client = AsyncClient {
3824 request_tx: RequestSender::WithNotice {
3825 requests,
3826 control_requests,
3827 immediate_disconnect,
3828 },
3829 };
3830 let runtime = runtime::Builder::new_current_thread()
3831 .enable_all()
3832 .build()
3833 .unwrap();
3834
3835 runtime
3836 .block_on(client.reauth_tracked(None))
3837 .expect("tracked auth should enqueue");
3838
3839 assert!(requests_rx.is_empty());
3840 let envelope = control_requests_rx
3841 .try_recv()
3842 .expect("tracked auth should use control channel");
3843 assert!(matches!(envelope.into_parts().0, Request::Auth(_)));
3844 }
3845}