Skip to main content

rumqttc/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use std::borrow::Cow;
4use std::time::Duration;
5
6use super::eventloop::{RequestChannelCapacity, RequestEnvelope};
7use super::mqttbytes::QoS;
8use super::mqttbytes::v5::{
9    Auth, AuthProperties, AuthReasonCode, Filter, PubAck, PubRec, Publish, PublishProperties,
10    Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties,
11};
12use super::{
13    ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop,
14    MqttOptions, Request,
15};
16use crate::notice::{AuthNoticeTx, PublishNoticeTx, SubscribeNoticeTx, UnsubscribeNoticeTx};
17use crate::{
18    AuthNotice, PublishNotice, SubscribeNotice, UnsubscribeNotice, valid_filter, valid_topic,
19};
20
21use bytes::Bytes;
22use flume::{SendError, Sender, TrySendError};
23use futures_util::FutureExt;
24use tokio::runtime::{self, Runtime};
25use tokio::time::timeout;
26
27/// An error returned when a topic string fails validation against the MQTT specification.
28#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
29#[error("Invalid MQTT topic: '{0}'")]
30pub struct InvalidTopic(String);
31
32/// A newtype wrapper that guarantees its inner `String` is a valid MQTT topic.
33///
34/// This type prevents the cost of repeated validation for topics that are used
35/// frequently. It can only be constructed via [`ValidatedTopic::new`], which
36/// performs a one-time validation check.
37///
38/// Use this when publishing repeatedly to the same topic to avoid per-call
39/// validation overhead in publish APIs that accept [`PublishTopic`].
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct ValidatedTopic(String);
42
43impl ValidatedTopic {
44    /// Constructs a new `ValidatedTopic` after validating the input string.
45    ///
46    /// # Errors
47    ///
48    /// Returns [`InvalidTopic`] if the topic string does not conform to the MQTT specification.
49    pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
50        let topic_string = topic.into();
51        if valid_topic(&topic_string) {
52            Ok(Self(topic_string))
53        } else {
54            Err(InvalidTopic(topic_string))
55        }
56    }
57}
58
59impl From<ValidatedTopic> for String {
60    fn from(topic: ValidatedTopic) -> Self {
61        topic.0
62    }
63}
64
65/// Topic argument accepted by publish APIs.
66///
67/// `ValidatedTopic` variants skip per-call validation while string variants are
68/// validated for MQTT topic correctness.
69pub enum PublishTopic {
70    /// Raw topic input that must be validated before publishing.
71    Unvalidated(String),
72    /// Topic that has already been validated once.
73    Validated(ValidatedTopic),
74}
75
76impl PublishTopic {
77    fn into_string_and_validation(self) -> (String, bool) {
78        match self {
79            Self::Unvalidated(topic) => (topic, true),
80            Self::Validated(topic) => (topic.0, false),
81        }
82    }
83}
84
85impl From<ValidatedTopic> for PublishTopic {
86    fn from(topic: ValidatedTopic) -> Self {
87        Self::Validated(topic)
88    }
89}
90
91impl From<String> for PublishTopic {
92    fn from(topic: String) -> Self {
93        Self::Unvalidated(topic)
94    }
95}
96
97impl From<&str> for PublishTopic {
98    fn from(topic: &str) -> Self {
99        Self::Unvalidated(topic.to_owned())
100    }
101}
102
103impl From<&String> for PublishTopic {
104    fn from(topic: &String) -> Self {
105        Self::Unvalidated(topic.clone())
106    }
107}
108
109impl From<Cow<'_, str>> for PublishTopic {
110    fn from(topic: Cow<'_, str>) -> Self {
111        Self::Unvalidated(topic.into_owned())
112    }
113}
114
115/// Client Error
116#[derive(Debug, thiserror::Error)]
117pub enum ClientError {
118    #[error("Failed to send mqtt requests to eventloop")]
119    Request(Box<Request>),
120    #[error("Failed to send mqtt requests to eventloop")]
121    TryRequest(Box<Request>),
122    #[error("Tracked request API is unavailable for this client instance")]
123    TrackingUnavailable,
124}
125
126impl From<SendError<Request>> for ClientError {
127    fn from(e: SendError<Request>) -> Self {
128        Self::Request(Box::new(e.into_inner()))
129    }
130}
131
132impl From<TrySendError<Request>> for ClientError {
133    fn from(e: TrySendError<Request>) -> Self {
134        Self::TryRequest(Box::new(e.into_inner()))
135    }
136}
137
138#[derive(Clone, Debug)]
139enum RequestSender {
140    Plain(Sender<Request>),
141    WithNotice {
142        requests: Sender<RequestEnvelope>,
143        control_requests: Sender<RequestEnvelope>,
144        immediate_disconnect: Sender<RequestEnvelope>,
145    },
146}
147
148fn into_request(envelope: RequestEnvelope) -> Request {
149    let (request, _notice) = envelope.into_parts();
150    request
151}
152
153fn map_send_envelope_error(err: SendError<RequestEnvelope>) -> ClientError {
154    ClientError::Request(Box::new(into_request(err.into_inner())))
155}
156
157fn map_try_send_envelope_error(err: TrySendError<RequestEnvelope>) -> ClientError {
158    match err {
159        TrySendError::Full(envelope) | TrySendError::Disconnected(envelope) => {
160            ClientError::TryRequest(Box::new(into_request(envelope)))
161        }
162    }
163}
164
165const fn is_publish_request(request: &Request) -> bool {
166    matches!(request, Request::Publish(_))
167}
168
169/// Prepared acknowledgement packet for manual acknowledgement mode.
170#[derive(Clone, Debug, PartialEq, Eq)]
171pub enum ManualAck {
172    PubAck(PubAck),
173    PubRec(PubRec),
174}
175
176impl ManualAck {
177    fn into_request(self) -> Request {
178        match self {
179            Self::PubAck(ack) => Request::PubAck(ack),
180            Self::PubRec(rec) => Request::PubRec(rec),
181        }
182    }
183}
184
185/// An asynchronous client, communicates with MQTT `EventLoop`.
186///
187/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
188/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
189///
190/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
191/// from the broker, i.e. move ahead.
192///
193/// Bounded clients apply backpressure through the client request channel. If the
194/// same task that drives [`EventLoop::poll`](crate::EventLoop::poll) awaits
195/// request-sending APIs such as [`publish`](Self::publish),
196/// [`subscribe`](Self::subscribe), [`unsubscribe`](Self::unsubscribe), or
197/// [`ack`](Self::ack) while that channel is full, it can self-block: the send is
198/// waiting for the event loop to read a request, but the event loop cannot make
199/// progress until that same task polls it again. For bounded async clients,
200/// prefer driving the event loop in a dedicated task. Use [`try_publish`](Self::try_publish)
201/// when dropping outgoing publishes under overload is intended.
202///
203/// The request channel is an admission queue, not a strict global wire FIFO
204/// guarantee. Under publish flow-control pressure, non-`PUBLISH` control
205/// packets can pass earlier `QoS` 1/ `QoS` 2 publishes that are not currently
206/// sendable. Application publishes preserve FIFO with other publishes.
207#[derive(Clone, Debug)]
208pub struct AsyncClient {
209    request_tx: RequestSender,
210}
211
212/// Builder for synchronous MQTT clients.
213///
214/// The request channel is bounded by default using
215/// [`MqttOptions::request_channel_capacity`]. Use [`Self::capacity`] to override
216/// the bounded capacity, or [`Self::unbounded`] to opt into an unbounded request channel.
217#[derive(Debug)]
218pub struct ClientBuilder {
219    options: MqttOptions,
220    capacity: RequestChannelCapacity,
221}
222
223/// Builder for asynchronous MQTT clients.
224///
225/// The request channel is bounded by default using
226/// [`MqttOptions::request_channel_capacity`]. Use [`Self::capacity`] to override
227/// the bounded capacity, or [`Self::unbounded`] to opt into an unbounded request channel.
228#[derive(Debug)]
229pub struct AsyncClientBuilder {
230    options: MqttOptions,
231    capacity: RequestChannelCapacity,
232}
233
234#[must_use]
235fn build_async_client(
236    options: MqttOptions,
237    capacity: RequestChannelCapacity,
238) -> (AsyncClient, EventLoop) {
239    let (eventloop, request_tx, control_request_tx, immediate_disconnect_tx) =
240        EventLoop::new_for_async_client_with_capacity(options, capacity);
241    let client = AsyncClient {
242        request_tx: RequestSender::WithNotice {
243            requests: request_tx,
244            control_requests: control_request_tx,
245            immediate_disconnect: immediate_disconnect_tx,
246        },
247    };
248
249    (client, eventloop)
250}
251
252impl ClientBuilder {
253    /// Create a new builder for a synchronous [`Client`].
254    #[must_use]
255    pub const fn new(options: MqttOptions) -> Self {
256        let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
257        Self { options, capacity }
258    }
259
260    /// Use a bounded request channel with the given capacity.
261    ///
262    /// `0` creates a bounded zero-capacity rendezvous channel. Use [`Self::unbounded`]
263    /// for an unbounded request channel.
264    #[must_use]
265    pub const fn capacity(mut self, cap: usize) -> Self {
266        self.capacity = RequestChannelCapacity::Bounded(cap);
267        self
268    }
269
270    /// Use an unbounded request channel.
271    #[must_use]
272    pub const fn unbounded(mut self) -> Self {
273        self.capacity = RequestChannelCapacity::Unbounded;
274        self
275    }
276
277    /// Build a synchronous client and connection.
278    ///
279    /// This builder always produces the synchronous client pair so the
280    /// terminal `build()` method matches the entry point that created it.
281    ///
282    /// # Panics
283    ///
284    /// Panics if the current-thread Tokio runtime cannot be created.
285    #[must_use]
286    pub fn build(self) -> (Client, Connection) {
287        let (client, eventloop) = build_async_client(self.options, self.capacity);
288        let client = Client { client };
289
290        let runtime = runtime::Builder::new_current_thread()
291            .enable_all()
292            .build()
293            .unwrap();
294
295        let connection = Connection::new(eventloop, runtime);
296        (client, connection)
297    }
298}
299
300impl AsyncClientBuilder {
301    /// Create a new builder for an asynchronous [`AsyncClient`].
302    #[must_use]
303    pub const fn new(options: MqttOptions) -> Self {
304        let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
305        Self { options, capacity }
306    }
307
308    /// Use a bounded request channel with the given capacity.
309    ///
310    /// `0` creates a bounded zero-capacity rendezvous channel. Use [`Self::unbounded`]
311    /// for an unbounded request channel.
312    #[must_use]
313    pub const fn capacity(mut self, cap: usize) -> Self {
314        self.capacity = RequestChannelCapacity::Bounded(cap);
315        self
316    }
317
318    /// Use an unbounded request channel.
319    #[must_use]
320    pub const fn unbounded(mut self) -> Self {
321        self.capacity = RequestChannelCapacity::Unbounded;
322        self
323    }
324
325    /// Build an asynchronous client and event loop.
326    ///
327    /// This builder always produces the asynchronous client pair so the
328    /// terminal `build()` method matches the entry point that created it.
329    #[must_use]
330    pub fn build(self) -> (AsyncClient, EventLoop) {
331        build_async_client(self.options, self.capacity)
332    }
333}
334
335impl AsyncClient {
336    /// Create a builder for an [`AsyncClient`].
337    ///
338    /// The returned [`AsyncClientBuilder`] only builds the asynchronous
339    /// client pair, which keeps the terminal `build()` method aligned with
340    /// this entry point.
341    #[must_use]
342    pub const fn builder(options: MqttOptions) -> AsyncClientBuilder {
343        AsyncClientBuilder::new(options)
344    }
345
346    /// Create a new `AsyncClient` from a channel `Sender`.
347    ///
348    /// This is mostly useful for creating a test instance where you can
349    /// listen on the corresponding receiver.
350    #[must_use]
351    pub const fn from_senders(request_tx: Sender<Request>) -> Self {
352        Self {
353            request_tx: RequestSender::Plain(request_tx),
354        }
355    }
356
357    async fn send_request_async(&self, request: Request) -> Result<(), ClientError> {
358        match &self.request_tx {
359            RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
360            RequestSender::WithNotice {
361                requests,
362                control_requests,
363                ..
364            } => {
365                let tx = if is_publish_request(&request) {
366                    requests
367                } else {
368                    control_requests
369                };
370                tx.send_async(RequestEnvelope::plain(request))
371                    .await
372                    .map_err(map_send_envelope_error)
373            }
374        }
375    }
376
377    fn try_send_request(&self, request: Request) -> Result<(), ClientError> {
378        match &self.request_tx {
379            RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
380            RequestSender::WithNotice {
381                requests,
382                control_requests,
383                ..
384            } => {
385                let tx = if is_publish_request(&request) {
386                    requests
387                } else {
388                    control_requests
389                };
390                tx.try_send(RequestEnvelope::plain(request))
391                    .map_err(map_try_send_envelope_error)
392            }
393        }
394    }
395
396    fn send_request(&self, request: Request) -> Result<(), ClientError> {
397        match &self.request_tx {
398            RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
399            RequestSender::WithNotice {
400                requests,
401                control_requests,
402                ..
403            } => {
404                let tx = if is_publish_request(&request) {
405                    requests
406                } else {
407                    control_requests
408                };
409                tx.send(RequestEnvelope::plain(request))
410                    .map_err(map_send_envelope_error)
411            }
412        }
413    }
414
415    async fn send_immediate_disconnect_async(&self, request: Request) -> Result<(), ClientError> {
416        match &self.request_tx {
417            RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
418            RequestSender::WithNotice {
419                immediate_disconnect,
420                ..
421            } => immediate_disconnect
422                .send_async(RequestEnvelope::plain(request))
423                .await
424                .map_err(map_send_envelope_error),
425        }
426    }
427
428    fn send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
429        match &self.request_tx {
430            RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
431            RequestSender::WithNotice {
432                immediate_disconnect,
433                ..
434            } => immediate_disconnect
435                .send(RequestEnvelope::plain(request))
436                .map_err(map_send_envelope_error),
437        }
438    }
439
440    fn try_send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
441        match &self.request_tx {
442            RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
443            RequestSender::WithNotice {
444                immediate_disconnect,
445                ..
446            } => immediate_disconnect
447                .try_send(RequestEnvelope::plain(request))
448                .map_err(map_try_send_envelope_error),
449        }
450    }
451
452    async fn send_tracked_publish_async(
453        &self,
454        publish: Publish,
455    ) -> Result<PublishNotice, ClientError> {
456        let RequestSender::WithNotice {
457            requests: request_tx,
458            ..
459        } = &self.request_tx
460        else {
461            return Err(ClientError::TrackingUnavailable);
462        };
463
464        let (notice_tx, notice) = PublishNoticeTx::new();
465        request_tx
466            .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
467            .await
468            .map_err(map_send_envelope_error)?;
469        Ok(notice)
470    }
471
472    fn try_send_tracked_publish(&self, publish: Publish) -> Result<PublishNotice, ClientError> {
473        let RequestSender::WithNotice {
474            requests: request_tx,
475            ..
476        } = &self.request_tx
477        else {
478            return Err(ClientError::TrackingUnavailable);
479        };
480
481        let (notice_tx, notice) = PublishNoticeTx::new();
482        request_tx
483            .try_send(RequestEnvelope::tracked_publish(publish, notice_tx))
484            .map_err(map_try_send_envelope_error)?;
485        Ok(notice)
486    }
487
488    async fn send_tracked_subscribe_async(
489        &self,
490        subscribe: Subscribe,
491    ) -> Result<SubscribeNotice, ClientError> {
492        let RequestSender::WithNotice {
493            control_requests: request_tx,
494            ..
495        } = &self.request_tx
496        else {
497            return Err(ClientError::TrackingUnavailable);
498        };
499
500        let (notice_tx, notice) = SubscribeNoticeTx::new();
501        request_tx
502            .send_async(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
503            .await
504            .map_err(map_send_envelope_error)?;
505        Ok(notice)
506    }
507
508    fn try_send_tracked_subscribe(
509        &self,
510        subscribe: Subscribe,
511    ) -> Result<SubscribeNotice, ClientError> {
512        let RequestSender::WithNotice {
513            control_requests: request_tx,
514            ..
515        } = &self.request_tx
516        else {
517            return Err(ClientError::TrackingUnavailable);
518        };
519
520        let (notice_tx, notice) = SubscribeNoticeTx::new();
521        request_tx
522            .try_send(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
523            .map_err(map_try_send_envelope_error)?;
524        Ok(notice)
525    }
526
527    async fn send_tracked_unsubscribe_async(
528        &self,
529        unsubscribe: Unsubscribe,
530    ) -> Result<UnsubscribeNotice, ClientError> {
531        let RequestSender::WithNotice {
532            control_requests: request_tx,
533            ..
534        } = &self.request_tx
535        else {
536            return Err(ClientError::TrackingUnavailable);
537        };
538
539        let (notice_tx, notice) = UnsubscribeNoticeTx::new();
540        request_tx
541            .send_async(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
542            .await
543            .map_err(map_send_envelope_error)?;
544        Ok(notice)
545    }
546
547    fn try_send_tracked_unsubscribe(
548        &self,
549        unsubscribe: Unsubscribe,
550    ) -> Result<UnsubscribeNotice, ClientError> {
551        let RequestSender::WithNotice {
552            control_requests: request_tx,
553            ..
554        } = &self.request_tx
555        else {
556            return Err(ClientError::TrackingUnavailable);
557        };
558
559        let (notice_tx, notice) = UnsubscribeNoticeTx::new();
560        request_tx
561            .try_send(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
562            .map_err(map_try_send_envelope_error)?;
563        Ok(notice)
564    }
565
566    async fn send_tracked_auth_async(&self, auth: Auth) -> Result<AuthNotice, ClientError> {
567        let RequestSender::WithNotice {
568            control_requests: request_tx,
569            ..
570        } = &self.request_tx
571        else {
572            return Err(ClientError::TrackingUnavailable);
573        };
574
575        let (notice_tx, notice) = AuthNoticeTx::new();
576        request_tx
577            .send_async(RequestEnvelope::tracked_auth(auth, notice_tx))
578            .await
579            .map_err(map_send_envelope_error)?;
580        Ok(notice)
581    }
582
583    fn send_tracked_auth(&self, auth: Auth) -> Result<AuthNotice, ClientError> {
584        let RequestSender::WithNotice {
585            control_requests: request_tx,
586            ..
587        } = &self.request_tx
588        else {
589            return Err(ClientError::TrackingUnavailable);
590        };
591
592        let (notice_tx, notice) = AuthNoticeTx::new();
593        request_tx
594            .send(RequestEnvelope::tracked_auth(auth, notice_tx))
595            .map_err(map_send_envelope_error)?;
596        Ok(notice)
597    }
598
599    fn try_send_tracked_auth(&self, auth: Auth) -> Result<AuthNotice, ClientError> {
600        let RequestSender::WithNotice {
601            control_requests: request_tx,
602            ..
603        } = &self.request_tx
604        else {
605            return Err(ClientError::TrackingUnavailable);
606        };
607
608        let (notice_tx, notice) = AuthNoticeTx::new();
609        request_tx
610            .try_send(RequestEnvelope::tracked_auth(auth, notice_tx))
611            .map_err(map_try_send_envelope_error)?;
612        Ok(notice)
613    }
614
615    /// Sends a MQTT Publish to the `EventLoop`.
616    async fn handle_publish<T, P>(
617        &self,
618        topic: T,
619        qos: QoS,
620        retain: bool,
621        payload: P,
622        properties: Option<PublishProperties>,
623    ) -> Result<(), ClientError>
624    where
625        T: Into<PublishTopic>,
626        P: Into<Bytes>,
627    {
628        let (topic, needs_validation) = topic.into().into_string_and_validation();
629        let invalid_topic = (needs_validation && !valid_topic(&topic))
630            || empty_topic_without_valid_alias(&topic, properties.as_ref());
631        let mut publish = Publish::new(topic, qos, payload, properties);
632        publish.retain = retain;
633        let publish = Request::Publish(publish);
634
635        if invalid_topic {
636            return Err(ClientError::Request(Box::new(publish)));
637        }
638
639        self.send_request_async(publish).await?;
640        Ok(())
641    }
642
643    async fn handle_publish_tracked<T, P>(
644        &self,
645        topic: T,
646        qos: QoS,
647        retain: bool,
648        payload: P,
649        properties: Option<PublishProperties>,
650    ) -> Result<PublishNotice, ClientError>
651    where
652        T: Into<PublishTopic>,
653        P: Into<Bytes>,
654    {
655        let (topic, needs_validation) = topic.into().into_string_and_validation();
656        let invalid_topic = (needs_validation && !valid_topic(&topic))
657            || empty_topic_without_valid_alias(&topic, properties.as_ref());
658        let mut publish = Publish::new(topic, qos, payload, properties);
659        publish.retain = retain;
660        let request = Request::Publish(publish.clone());
661
662        if invalid_topic {
663            return Err(ClientError::Request(Box::new(request)));
664        }
665
666        self.send_tracked_publish_async(publish).await
667    }
668
669    /// Sends a MQTT Publish with properties to the `EventLoop`.
670    ///
671    /// # Errors
672    ///
673    /// Returns an error if the topic or topic alias usage is invalid, or if
674    /// the request cannot be queued on the event loop.
675    pub async fn publish_with_properties<T, P>(
676        &self,
677        topic: T,
678        qos: QoS,
679        retain: bool,
680        payload: P,
681        properties: PublishProperties,
682    ) -> Result<(), ClientError>
683    where
684        T: Into<PublishTopic>,
685        P: Into<Bytes>,
686    {
687        self.handle_publish(topic, qos, retain, payload, Some(properties))
688            .await
689    }
690
691    /// Sends a MQTT Publish with properties to the `EventLoop` and returns a tracked notice.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the topic or topic alias usage is invalid, or if
696    /// the request cannot be queued on the event loop.
697    pub async fn publish_with_properties_tracked<T, P>(
698        &self,
699        topic: T,
700        qos: QoS,
701        retain: bool,
702        payload: P,
703        properties: PublishProperties,
704    ) -> Result<PublishNotice, ClientError>
705    where
706        T: Into<PublishTopic>,
707        P: Into<Bytes>,
708    {
709        self.handle_publish_tracked(topic, qos, retain, payload, Some(properties))
710            .await
711    }
712
713    /// Sends a MQTT Publish to the `EventLoop`.
714    ///
715    /// # Errors
716    ///
717    /// Returns an error if the topic or topic alias usage is invalid, or if
718    /// the request cannot be queued on the event loop.
719    pub async fn publish<T, P>(
720        &self,
721        topic: T,
722        qos: QoS,
723        retain: bool,
724        payload: P,
725    ) -> Result<(), ClientError>
726    where
727        T: Into<PublishTopic>,
728        P: Into<Bytes>,
729    {
730        self.handle_publish(topic, qos, retain, payload, None).await
731    }
732
733    /// Sends a MQTT Publish to the `EventLoop` and returns a tracked notice.
734    ///
735    /// # Errors
736    ///
737    /// Returns an error if the topic or topic alias usage is invalid, or if
738    /// the request cannot be queued on the event loop.
739    pub async fn publish_tracked<T, P>(
740        &self,
741        topic: T,
742        qos: QoS,
743        retain: bool,
744        payload: P,
745    ) -> Result<PublishNotice, ClientError>
746    where
747        T: Into<PublishTopic>,
748        P: Into<Bytes>,
749    {
750        self.handle_publish_tracked(topic, qos, retain, payload, None)
751            .await
752    }
753
754    /// Attempts to send a MQTT Publish to the `EventLoop`.
755    fn handle_try_publish<T, P>(
756        &self,
757        topic: T,
758        qos: QoS,
759        retain: bool,
760        payload: P,
761        properties: Option<PublishProperties>,
762    ) -> Result<(), ClientError>
763    where
764        T: Into<PublishTopic>,
765        P: Into<Bytes>,
766    {
767        let (topic, needs_validation) = topic.into().into_string_and_validation();
768        let invalid_topic = (needs_validation && !valid_topic(&topic))
769            || empty_topic_without_valid_alias(&topic, properties.as_ref());
770        let mut publish = Publish::new(topic, qos, payload, properties);
771        publish.retain = retain;
772        let publish = Request::Publish(publish);
773
774        if invalid_topic {
775            return Err(ClientError::TryRequest(Box::new(publish)));
776        }
777
778        self.try_send_request(publish)?;
779        Ok(())
780    }
781
782    fn handle_try_publish_tracked<T, P>(
783        &self,
784        topic: T,
785        qos: QoS,
786        retain: bool,
787        payload: P,
788        properties: Option<PublishProperties>,
789    ) -> Result<PublishNotice, ClientError>
790    where
791        T: Into<PublishTopic>,
792        P: Into<Bytes>,
793    {
794        let (topic, needs_validation) = topic.into().into_string_and_validation();
795        let invalid_topic = (needs_validation && !valid_topic(&topic))
796            || empty_topic_without_valid_alias(&topic, properties.as_ref());
797        let mut publish = Publish::new(topic, qos, payload, properties);
798        publish.retain = retain;
799        let request = Request::Publish(publish.clone());
800
801        if invalid_topic {
802            return Err(ClientError::TryRequest(Box::new(request)));
803        }
804
805        self.try_send_tracked_publish(publish)
806    }
807
808    /// Attempts to send a MQTT Publish with properties to the `EventLoop`.
809    ///
810    /// This is the non-blocking publish API for overload policies that may drop
811    /// outgoing publishes. If the bounded request channel is full, this returns
812    /// an error immediately and the publish has not been queued.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if the topic or topic alias usage is invalid, or if
817    /// the request cannot be queued immediately on the event loop.
818    pub fn try_publish_with_properties<T, P>(
819        &self,
820        topic: T,
821        qos: QoS,
822        retain: bool,
823        payload: P,
824        properties: PublishProperties,
825    ) -> Result<(), ClientError>
826    where
827        T: Into<PublishTopic>,
828        P: Into<Bytes>,
829    {
830        self.handle_try_publish(topic, qos, retain, payload, Some(properties))
831    }
832
833    /// Attempts to send a MQTT Publish with properties to the `EventLoop` and returns a tracked notice.
834    ///
835    /// This is the non-blocking tracked publish API for overload policies that
836    /// may drop outgoing publishes. If the bounded request channel is full, this
837    /// returns an error immediately and the publish has not been queued.
838    ///
839    /// # Errors
840    ///
841    /// Returns an error if the topic or topic alias usage is invalid, or if
842    /// the request cannot be queued immediately on the event loop.
843    pub fn try_publish_with_properties_tracked<T, P>(
844        &self,
845        topic: T,
846        qos: QoS,
847        retain: bool,
848        payload: P,
849        properties: PublishProperties,
850    ) -> Result<PublishNotice, ClientError>
851    where
852        T: Into<PublishTopic>,
853        P: Into<Bytes>,
854    {
855        self.handle_try_publish_tracked(topic, qos, retain, payload, Some(properties))
856    }
857
858    /// Attempts to send a MQTT Publish to the `EventLoop`.
859    ///
860    /// This is the non-blocking publish API for overload policies that may drop
861    /// outgoing publishes. If the bounded request channel is full, this returns
862    /// an error immediately and the publish has not been queued.
863    ///
864    /// # Errors
865    ///
866    /// Returns an error if the topic or topic alias usage is invalid, or if
867    /// the request cannot be queued immediately on the event loop.
868    pub fn try_publish<T, P>(
869        &self,
870        topic: T,
871        qos: QoS,
872        retain: bool,
873        payload: P,
874    ) -> Result<(), ClientError>
875    where
876        T: Into<PublishTopic>,
877        P: Into<Bytes>,
878    {
879        self.handle_try_publish(topic, qos, retain, payload, None)
880    }
881
882    /// Attempts to send a MQTT Publish to the `EventLoop` and returns a tracked notice.
883    ///
884    /// This is the non-blocking tracked publish API for overload policies that
885    /// may drop outgoing publishes. If the bounded request channel is full, this
886    /// returns an error immediately and the publish has not been queued.
887    ///
888    /// # Errors
889    ///
890    /// Returns an error if the topic or topic alias usage is invalid, or if
891    /// the request cannot be queued immediately on the event loop.
892    pub fn try_publish_tracked<T, P>(
893        &self,
894        topic: T,
895        qos: QoS,
896        retain: bool,
897        payload: P,
898    ) -> Result<PublishNotice, ClientError>
899    where
900        T: Into<PublishTopic>,
901        P: Into<Bytes>,
902    {
903        self.handle_try_publish_tracked(topic, qos, retain, payload, None)
904    }
905
906    /// Prepares a MQTT PubAck/PubRec packet for manual acknowledgement.
907    ///
908    /// Returns `None` for `QoS0` publishes, which do not require acknowledgement.
909    ///
910    /// This is typically used together with
911    /// [`MqttOptions::set_manual_acks`](`crate::MqttOptions::set_manual_acks`)
912    /// when acknowledgement must be deferred or customized.
913    ///
914    /// # Examples
915    ///
916    /// ```no_run
917    /// use rumqttc::mqttbytes::v5::{PubAckProperties, PubAckReason, Publish};
918    /// use rumqttc::{AsyncClient, ManualAck, MqttOptions};
919    ///
920    /// # async fn example(client: AsyncClient, publish: Publish) -> Result<(), rumqttc::ClientError> {
921    /// let Some(mut ack) = client.prepare_ack(&publish) else {
922    ///     return Ok(());
923    /// };
924    ///
925    /// if let ManualAck::PubAck(puback) = &mut ack {
926    ///     puback.reason = PubAckReason::NoMatchingSubscribers;
927    ///     puback.properties = Some(PubAckProperties {
928    ///         reason_string: Some("No active subscribers now".to_owned()),
929    ///         user_properties: vec![("source".to_owned(), "application".to_owned())],
930    ///     });
931    /// }
932    ///
933    /// client.manual_ack(ack).await?;
934    /// # Ok(())
935    /// # }
936    /// ```
937    pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
938        prepare_ack(publish)
939    }
940
941    /// Sends a prepared MQTT PubAck/PubRec to the `EventLoop`.
942    ///
943    /// This is useful when `manual_acks` is enabled and acknowledgement must be deferred.
944    ///
945    /// # Errors
946    ///
947    /// Returns an error if the acknowledgement cannot be queued on the event
948    /// loop.
949    pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
950        self.send_request_async(ack.into_request()).await?;
951        Ok(())
952    }
953
954    /// Attempts to send a prepared MQTT PubAck/PubRec to the `EventLoop`.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the acknowledgement cannot be queued immediately on
959    /// the event loop.
960    pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
961        self.try_send_request(ack.into_request())?;
962        Ok(())
963    }
964
965    /// Sends a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
966    /// Only needed if the `manual_acks` flag is set.
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if the derived acknowledgement cannot be queued on the
971    /// event loop.
972    pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
973        if let Some(ack) = self.prepare_ack(publish) {
974            self.manual_ack(ack).await?;
975        }
976        Ok(())
977    }
978
979    /// Attempts to send a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
980    /// Only needed if the `manual_acks` flag is set.
981    ///
982    /// # Errors
983    ///
984    /// Returns an error if the derived acknowledgement cannot be queued
985    /// immediately on the event loop.
986    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
987        if let Some(ack) = self.prepare_ack(publish) {
988            self.try_manual_ack(ack)?;
989        }
990        Ok(())
991    }
992
993    /// Sends a MQTT AUTH packet to the `EventLoop`.
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the AUTH packet cannot be queued on the event loop.
998    pub async fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
999        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1000        let auth = Request::Auth(auth);
1001        self.send_request_async(auth).await?;
1002        Ok(())
1003    }
1004
1005    /// Sends a tracked MQTT re-authentication request to the `EventLoop`.
1006    ///
1007    /// # Errors
1008    ///
1009    /// Returns an error if the AUTH packet cannot be queued on the event loop
1010    /// or if tracked request notices are unavailable for this client instance.
1011    pub async fn reauth_tracked(
1012        &self,
1013        properties: Option<AuthProperties>,
1014    ) -> Result<AuthNotice, ClientError> {
1015        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1016        self.send_tracked_auth_async(auth).await
1017    }
1018
1019    /// Attempts to send a MQTT AUTH packet to the `EventLoop`.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the AUTH packet cannot be queued immediately on the
1024    /// event loop.
1025    pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
1026        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1027        let auth = Request::Auth(auth);
1028        self.try_send_request(auth)?;
1029        Ok(())
1030    }
1031
1032    /// Attempts to send a tracked MQTT re-authentication request to the `EventLoop`.
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if the AUTH packet cannot be queued immediately on the
1037    /// event loop or if tracked request notices are unavailable for this client
1038    /// instance.
1039    pub fn try_reauth_tracked(
1040        &self,
1041        properties: Option<AuthProperties>,
1042    ) -> Result<AuthNotice, ClientError> {
1043        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
1044        self.try_send_tracked_auth(auth)
1045    }
1046
1047    /// Sends a MQTT Publish to the `EventLoop`
1048    async fn handle_publish_bytes<T>(
1049        &self,
1050        topic: T,
1051        qos: QoS,
1052        retain: bool,
1053        payload: Bytes,
1054        properties: Option<PublishProperties>,
1055    ) -> Result<(), ClientError>
1056    where
1057        T: Into<PublishTopic>,
1058    {
1059        let (topic, needs_validation) = topic.into().into_string_and_validation();
1060        let invalid_topic = (needs_validation && !valid_topic(&topic))
1061            || empty_topic_without_valid_alias(&topic, properties.as_ref());
1062        let mut publish = Publish::new(topic, qos, payload, properties);
1063        publish.retain = retain;
1064        let publish = Request::Publish(publish);
1065
1066        if invalid_topic {
1067            return Err(ClientError::Request(Box::new(publish)));
1068        }
1069
1070        self.send_request_async(publish).await?;
1071        Ok(())
1072    }
1073
1074    async fn handle_publish_bytes_tracked<T>(
1075        &self,
1076        topic: T,
1077        qos: QoS,
1078        retain: bool,
1079        payload: Bytes,
1080        properties: Option<PublishProperties>,
1081    ) -> Result<PublishNotice, ClientError>
1082    where
1083        T: Into<PublishTopic>,
1084    {
1085        let (topic, needs_validation) = topic.into().into_string_and_validation();
1086        let invalid_topic = (needs_validation && !valid_topic(&topic))
1087            || empty_topic_without_valid_alias(&topic, properties.as_ref());
1088        let mut publish = Publish::new(topic, qos, payload, properties);
1089        publish.retain = retain;
1090        let request = Request::Publish(publish.clone());
1091
1092        if invalid_topic {
1093            return Err(ClientError::Request(Box::new(request)));
1094        }
1095
1096        self.send_tracked_publish_async(publish).await
1097    }
1098
1099    /// Sends a MQTT Publish with properties to the `EventLoop`.
1100    ///
1101    /// # Errors
1102    ///
1103    /// Returns an error if the topic or topic alias usage is invalid, or if
1104    /// the request cannot be queued on the event loop.
1105    pub async fn publish_bytes_with_properties<T>(
1106        &self,
1107        topic: T,
1108        qos: QoS,
1109        retain: bool,
1110        payload: Bytes,
1111        properties: PublishProperties,
1112    ) -> Result<(), ClientError>
1113    where
1114        T: Into<PublishTopic>,
1115    {
1116        self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
1117            .await
1118    }
1119
1120    /// Sends a MQTT Publish with `Bytes` payload and properties, returning a tracked notice.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Returns an error if the topic or topic alias usage is invalid, or if
1125    /// the request cannot be queued on the event loop.
1126    pub async fn publish_bytes_with_properties_tracked<T>(
1127        &self,
1128        topic: T,
1129        qos: QoS,
1130        retain: bool,
1131        payload: Bytes,
1132        properties: PublishProperties,
1133    ) -> Result<PublishNotice, ClientError>
1134    where
1135        T: Into<PublishTopic>,
1136    {
1137        self.handle_publish_bytes_tracked(topic, qos, retain, payload, Some(properties))
1138            .await
1139    }
1140
1141    /// Sends a MQTT Publish with `Bytes` payload to the `EventLoop`.
1142    ///
1143    /// # Errors
1144    ///
1145    /// Returns an error if the topic or topic alias usage is invalid, or if
1146    /// the request cannot be queued on the event loop.
1147    pub async fn publish_bytes<T>(
1148        &self,
1149        topic: T,
1150        qos: QoS,
1151        retain: bool,
1152        payload: Bytes,
1153    ) -> Result<(), ClientError>
1154    where
1155        T: Into<PublishTopic>,
1156    {
1157        self.handle_publish_bytes(topic, qos, retain, payload, None)
1158            .await
1159    }
1160
1161    /// Sends a MQTT Publish with `Bytes` payload to the `EventLoop` and returns a tracked notice.
1162    ///
1163    /// # Errors
1164    ///
1165    /// Returns an error if the topic or topic alias usage is invalid, or if
1166    /// the request cannot be queued on the event loop.
1167    pub async fn publish_bytes_tracked<T>(
1168        &self,
1169        topic: T,
1170        qos: QoS,
1171        retain: bool,
1172        payload: Bytes,
1173    ) -> Result<PublishNotice, ClientError>
1174    where
1175        T: Into<PublishTopic>,
1176    {
1177        self.handle_publish_bytes_tracked(topic, qos, retain, payload, None)
1178            .await
1179    }
1180
1181    /// Sends a MQTT Subscribe to the `EventLoop`
1182    async fn handle_subscribe<S: Into<String>>(
1183        &self,
1184        topic: S,
1185        qos: QoS,
1186        properties: Option<SubscribeProperties>,
1187    ) -> Result<(), ClientError> {
1188        let filter = Filter::new(topic, qos);
1189        let subscribe = Subscribe::new(filter, properties);
1190        if !subscribe_has_valid_filters(&subscribe) {
1191            return Err(ClientError::Request(Box::new(subscribe.into())));
1192        }
1193
1194        self.send_request_async(subscribe.into()).await?;
1195        Ok(())
1196    }
1197
1198    async fn handle_subscribe_tracked<S: Into<String>>(
1199        &self,
1200        topic: S,
1201        qos: QoS,
1202        properties: Option<SubscribeProperties>,
1203    ) -> Result<SubscribeNotice, ClientError> {
1204        let filter = Filter::new(topic, qos);
1205        let subscribe = Subscribe::new(filter, properties);
1206        if !subscribe_has_valid_filters(&subscribe) {
1207            return Err(ClientError::Request(Box::new(subscribe.into())));
1208        }
1209
1210        self.send_tracked_subscribe_async(subscribe).await
1211    }
1212
1213    /// Sends a MQTT Subscribe with properties to the `EventLoop`.
1214    ///
1215    /// # Errors
1216    ///
1217    /// Returns an error if the topic filter is invalid or if the request
1218    /// cannot be queued on the event loop.
1219    pub async fn subscribe_with_properties<S: Into<String>>(
1220        &self,
1221        topic: S,
1222        qos: QoS,
1223        properties: SubscribeProperties,
1224    ) -> Result<(), ClientError> {
1225        self.handle_subscribe(topic, qos, Some(properties)).await
1226    }
1227
1228    /// Sends a tracked MQTT Subscribe with properties to the `EventLoop`.
1229    ///
1230    /// # Errors
1231    ///
1232    /// Returns an error if the topic filter is invalid or if the request
1233    /// cannot be queued on the event loop.
1234    pub async fn subscribe_with_properties_tracked<S: Into<String>>(
1235        &self,
1236        topic: S,
1237        qos: QoS,
1238        properties: SubscribeProperties,
1239    ) -> Result<SubscribeNotice, ClientError> {
1240        self.handle_subscribe_tracked(topic, qos, Some(properties))
1241            .await
1242    }
1243
1244    /// Sends a MQTT Subscribe to the `EventLoop`.
1245    ///
1246    /// # Errors
1247    ///
1248    /// Returns an error if the topic filter is invalid or if the request
1249    /// cannot be queued on the event loop.
1250    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1251        self.handle_subscribe(topic, qos, None).await
1252    }
1253
1254    /// Sends a tracked MQTT Subscribe to the `EventLoop`.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Returns an error if the topic filter is invalid or if the request
1259    /// cannot be queued on the event loop.
1260    pub async fn subscribe_tracked<S: Into<String>>(
1261        &self,
1262        topic: S,
1263        qos: QoS,
1264    ) -> Result<SubscribeNotice, ClientError> {
1265        self.handle_subscribe_tracked(topic, qos, None).await
1266    }
1267
1268    /// Attempts to send a MQTT Subscribe to the `EventLoop`
1269    fn handle_try_subscribe<S: Into<String>>(
1270        &self,
1271        topic: S,
1272        qos: QoS,
1273        properties: Option<SubscribeProperties>,
1274    ) -> Result<(), ClientError> {
1275        let filter = Filter::new(topic, qos);
1276        let subscribe = Subscribe::new(filter, properties);
1277        if !subscribe_has_valid_filters(&subscribe) {
1278            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1279        }
1280
1281        self.try_send_request(subscribe.into())?;
1282        Ok(())
1283    }
1284
1285    fn handle_try_subscribe_tracked<S: Into<String>>(
1286        &self,
1287        topic: S,
1288        qos: QoS,
1289        properties: Option<SubscribeProperties>,
1290    ) -> Result<SubscribeNotice, ClientError> {
1291        let filter = Filter::new(topic, qos);
1292        let subscribe = Subscribe::new(filter, properties);
1293        if !subscribe_has_valid_filters(&subscribe) {
1294            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1295        }
1296
1297        self.try_send_tracked_subscribe(subscribe)
1298    }
1299
1300    /// Attempts to send a MQTT Subscribe with properties to the `EventLoop`.
1301    ///
1302    /// # Errors
1303    ///
1304    /// Returns an error if the topic filter is invalid or if the request
1305    /// cannot be queued immediately on the event loop.
1306    pub fn try_subscribe_with_properties<S: Into<String>>(
1307        &self,
1308        topic: S,
1309        qos: QoS,
1310        properties: SubscribeProperties,
1311    ) -> Result<(), ClientError> {
1312        self.handle_try_subscribe(topic, qos, Some(properties))
1313    }
1314
1315    /// Attempts to send a tracked MQTT Subscribe with properties to the `EventLoop`.
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns an error if the topic filter is invalid or if the request
1320    /// cannot be queued immediately on the event loop.
1321    pub fn try_subscribe_with_properties_tracked<S: Into<String>>(
1322        &self,
1323        topic: S,
1324        qos: QoS,
1325        properties: SubscribeProperties,
1326    ) -> Result<SubscribeNotice, ClientError> {
1327        self.handle_try_subscribe_tracked(topic, qos, Some(properties))
1328    }
1329
1330    /// Attempts to send a MQTT Subscribe to the `EventLoop`.
1331    ///
1332    /// # Errors
1333    ///
1334    /// Returns an error if the topic filter is invalid or if the request
1335    /// cannot be queued immediately on the event loop.
1336    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1337        self.handle_try_subscribe(topic, qos, None)
1338    }
1339
1340    /// Attempts to send a tracked MQTT Subscribe to the `EventLoop`.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if the topic filter is invalid or if the request
1345    /// cannot be queued immediately on the event loop.
1346    pub fn try_subscribe_tracked<S: Into<String>>(
1347        &self,
1348        topic: S,
1349        qos: QoS,
1350    ) -> Result<SubscribeNotice, ClientError> {
1351        self.handle_try_subscribe_tracked(topic, qos, None)
1352    }
1353
1354    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
1355    async fn handle_subscribe_many<T>(
1356        &self,
1357        topics: T,
1358        properties: Option<SubscribeProperties>,
1359    ) -> Result<(), ClientError>
1360    where
1361        T: IntoIterator<Item = Filter>,
1362    {
1363        let subscribe = Subscribe::new_many(topics, properties);
1364        if !subscribe_has_valid_filters(&subscribe) {
1365            return Err(ClientError::Request(Box::new(subscribe.into())));
1366        }
1367
1368        self.send_request_async(subscribe.into()).await?;
1369
1370        Ok(())
1371    }
1372
1373    async fn handle_subscribe_many_tracked<T>(
1374        &self,
1375        topics: T,
1376        properties: Option<SubscribeProperties>,
1377    ) -> Result<SubscribeNotice, ClientError>
1378    where
1379        T: IntoIterator<Item = Filter>,
1380    {
1381        let subscribe = Subscribe::new_many(topics, properties);
1382        if !subscribe_has_valid_filters(&subscribe) {
1383            return Err(ClientError::Request(Box::new(subscribe.into())));
1384        }
1385
1386        self.send_tracked_subscribe_async(subscribe).await
1387    }
1388
1389    /// Sends a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns an error if the filter list is invalid or if the request cannot
1394    /// be queued on the event loop.
1395    pub async fn subscribe_many_with_properties<T>(
1396        &self,
1397        topics: T,
1398        properties: SubscribeProperties,
1399    ) -> Result<(), ClientError>
1400    where
1401        T: IntoIterator<Item = Filter>,
1402    {
1403        self.handle_subscribe_many(topics, Some(properties)).await
1404    }
1405
1406    /// Sends a tracked MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1407    ///
1408    /// # Errors
1409    ///
1410    /// Returns an error if the filter list is invalid or if the request cannot
1411    /// be queued on the event loop.
1412    pub async fn subscribe_many_with_properties_tracked<T>(
1413        &self,
1414        topics: T,
1415        properties: SubscribeProperties,
1416    ) -> Result<SubscribeNotice, ClientError>
1417    where
1418        T: IntoIterator<Item = Filter>,
1419    {
1420        self.handle_subscribe_many_tracked(topics, Some(properties))
1421            .await
1422    }
1423
1424    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`.
1425    ///
1426    /// # Errors
1427    ///
1428    /// Returns an error if the filter list is invalid or if the request cannot
1429    /// be queued on the event loop.
1430    pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1431    where
1432        T: IntoIterator<Item = Filter>,
1433    {
1434        self.handle_subscribe_many(topics, None).await
1435    }
1436
1437    /// Sends a tracked MQTT Subscribe for multiple topics to the `EventLoop`.
1438    ///
1439    /// # Errors
1440    ///
1441    /// Returns an error if the filter list is invalid or if the request cannot
1442    /// be queued on the event loop.
1443    pub async fn subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
1444    where
1445        T: IntoIterator<Item = Filter>,
1446    {
1447        self.handle_subscribe_many_tracked(topics, None).await
1448    }
1449
1450    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
1451    fn handle_try_subscribe_many<T>(
1452        &self,
1453        topics: T,
1454        properties: Option<SubscribeProperties>,
1455    ) -> Result<(), ClientError>
1456    where
1457        T: IntoIterator<Item = Filter>,
1458    {
1459        let subscribe = Subscribe::new_many(topics, properties);
1460        if !subscribe_has_valid_filters(&subscribe) {
1461            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1462        }
1463
1464        self.try_send_request(subscribe.into())?;
1465        Ok(())
1466    }
1467
1468    fn handle_try_subscribe_many_tracked<T>(
1469        &self,
1470        topics: T,
1471        properties: Option<SubscribeProperties>,
1472    ) -> Result<SubscribeNotice, ClientError>
1473    where
1474        T: IntoIterator<Item = Filter>,
1475    {
1476        let subscribe = Subscribe::new_many(topics, properties);
1477        if !subscribe_has_valid_filters(&subscribe) {
1478            return Err(ClientError::TryRequest(Box::new(subscribe.into())));
1479        }
1480
1481        self.try_send_tracked_subscribe(subscribe)
1482    }
1483
1484    /// Attempts to send a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1485    ///
1486    /// # Errors
1487    ///
1488    /// Returns an error if the filter list is invalid or if the request cannot
1489    /// be queued immediately on the event loop.
1490    pub fn try_subscribe_many_with_properties<T>(
1491        &self,
1492        topics: T,
1493        properties: SubscribeProperties,
1494    ) -> Result<(), ClientError>
1495    where
1496        T: IntoIterator<Item = Filter>,
1497    {
1498        self.handle_try_subscribe_many(topics, Some(properties))
1499    }
1500
1501    /// Attempts to send a tracked MQTT Subscribe for multiple topics with properties to the `EventLoop`.
1502    ///
1503    /// # Errors
1504    ///
1505    /// Returns an error if the filter list is invalid or if the request cannot
1506    /// be queued immediately on the event loop.
1507    pub fn try_subscribe_many_with_properties_tracked<T>(
1508        &self,
1509        topics: T,
1510        properties: SubscribeProperties,
1511    ) -> Result<SubscribeNotice, ClientError>
1512    where
1513        T: IntoIterator<Item = Filter>,
1514    {
1515        self.handle_try_subscribe_many_tracked(topics, Some(properties))
1516    }
1517
1518    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`.
1519    ///
1520    /// # Errors
1521    ///
1522    /// Returns an error if the filter list is invalid or if the request cannot
1523    /// be queued immediately on the event loop.
1524    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1525    where
1526        T: IntoIterator<Item = Filter>,
1527    {
1528        self.handle_try_subscribe_many(topics, None)
1529    }
1530
1531    /// Attempts to send a tracked MQTT Subscribe for multiple topics to the `EventLoop`.
1532    ///
1533    /// # Errors
1534    ///
1535    /// Returns an error if the filter list is invalid or if the request cannot
1536    /// be queued immediately on the event loop.
1537    pub fn try_subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
1538    where
1539        T: IntoIterator<Item = Filter>,
1540    {
1541        self.handle_try_subscribe_many_tracked(topics, None)
1542    }
1543
1544    /// Sends a MQTT Unsubscribe to the `EventLoop`
1545    async fn handle_unsubscribe<S: Into<String>>(
1546        &self,
1547        topic: S,
1548        properties: Option<UnsubscribeProperties>,
1549    ) -> Result<(), ClientError> {
1550        let unsubscribe = Unsubscribe::new(topic, properties);
1551        let request = Request::Unsubscribe(unsubscribe);
1552        self.send_request_async(request).await?;
1553        Ok(())
1554    }
1555
1556    async fn handle_unsubscribe_tracked<S: Into<String>>(
1557        &self,
1558        topic: S,
1559        properties: Option<UnsubscribeProperties>,
1560    ) -> Result<UnsubscribeNotice, ClientError> {
1561        let unsubscribe = Unsubscribe::new(topic, properties);
1562        self.send_tracked_unsubscribe_async(unsubscribe).await
1563    }
1564
1565    /// Sends a MQTT Unsubscribe with properties to the `EventLoop`.
1566    ///
1567    /// # Errors
1568    ///
1569    /// Returns an error if the request cannot be queued on the event loop.
1570    pub async fn unsubscribe_with_properties<S: Into<String>>(
1571        &self,
1572        topic: S,
1573        properties: UnsubscribeProperties,
1574    ) -> Result<(), ClientError> {
1575        self.handle_unsubscribe(topic, Some(properties)).await
1576    }
1577
1578    /// Sends a tracked MQTT Unsubscribe with properties to the `EventLoop`.
1579    ///
1580    /// # Errors
1581    ///
1582    /// Returns an error if the request cannot be queued on the event loop.
1583    pub async fn unsubscribe_with_properties_tracked<S: Into<String>>(
1584        &self,
1585        topic: S,
1586        properties: UnsubscribeProperties,
1587    ) -> Result<UnsubscribeNotice, ClientError> {
1588        self.handle_unsubscribe_tracked(topic, Some(properties))
1589            .await
1590    }
1591
1592    /// Sends a MQTT Unsubscribe to the `EventLoop`.
1593    ///
1594    /// # Errors
1595    ///
1596    /// Returns an error if the request cannot be queued on the event loop.
1597    pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1598        self.handle_unsubscribe(topic, None).await
1599    }
1600
1601    /// Sends a tracked MQTT Unsubscribe to the `EventLoop`.
1602    ///
1603    /// # Errors
1604    ///
1605    /// Returns an error if the request cannot be queued on the event loop.
1606    pub async fn unsubscribe_tracked<S: Into<String>>(
1607        &self,
1608        topic: S,
1609    ) -> Result<UnsubscribeNotice, ClientError> {
1610        self.handle_unsubscribe_tracked(topic, None).await
1611    }
1612
1613    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`
1614    fn handle_try_unsubscribe<S: Into<String>>(
1615        &self,
1616        topic: S,
1617        properties: Option<UnsubscribeProperties>,
1618    ) -> Result<(), ClientError> {
1619        let unsubscribe = Unsubscribe::new(topic, properties);
1620        let request = Request::Unsubscribe(unsubscribe);
1621        self.try_send_request(request)?;
1622        Ok(())
1623    }
1624
1625    fn handle_try_unsubscribe_tracked<S: Into<String>>(
1626        &self,
1627        topic: S,
1628        properties: Option<UnsubscribeProperties>,
1629    ) -> Result<UnsubscribeNotice, ClientError> {
1630        let unsubscribe = Unsubscribe::new(topic, properties);
1631        self.try_send_tracked_unsubscribe(unsubscribe)
1632    }
1633
1634    /// Attempts to send a MQTT Unsubscribe with properties to the `EventLoop`.
1635    ///
1636    /// # Errors
1637    ///
1638    /// Returns an error if the request cannot be queued immediately on the
1639    /// event loop.
1640    pub fn try_unsubscribe_with_properties<S: Into<String>>(
1641        &self,
1642        topic: S,
1643        properties: UnsubscribeProperties,
1644    ) -> Result<(), ClientError> {
1645        self.handle_try_unsubscribe(topic, Some(properties))
1646    }
1647
1648    /// Attempts to send a tracked MQTT Unsubscribe with properties to the `EventLoop`.
1649    ///
1650    /// # Errors
1651    ///
1652    /// Returns an error if the request cannot be queued immediately on the
1653    /// event loop.
1654    pub fn try_unsubscribe_with_properties_tracked<S: Into<String>>(
1655        &self,
1656        topic: S,
1657        properties: UnsubscribeProperties,
1658    ) -> Result<UnsubscribeNotice, ClientError> {
1659        self.handle_try_unsubscribe_tracked(topic, Some(properties))
1660    }
1661
1662    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`.
1663    ///
1664    /// # Errors
1665    ///
1666    /// Returns an error if the request cannot be queued immediately on the
1667    /// event loop.
1668    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1669        self.handle_try_unsubscribe(topic, None)
1670    }
1671
1672    /// Attempts to send a tracked MQTT Unsubscribe to the `EventLoop`.
1673    ///
1674    /// # Errors
1675    ///
1676    /// Returns an error if the request cannot be queued immediately on the
1677    /// event loop.
1678    pub fn try_unsubscribe_tracked<S: Into<String>>(
1679        &self,
1680        topic: S,
1681    ) -> Result<UnsubscribeNotice, ClientError> {
1682        self.handle_try_unsubscribe_tracked(topic, None)
1683    }
1684
1685    /// Queues a graceful MQTT disconnect barrier with default
1686    /// `DisconnectReasonCode::NormalDisconnection`.
1687    ///
1688    /// Once the event loop observes this request, it stops processing later
1689    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1690    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1691    /// subscribe/unsubscribe requests to complete before sending MQTT
1692    /// `DISCONNECT`.
1693    ///
1694    /// This request uses the normal client request channel. Under publish
1695    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1696    /// that are not currently sendable; once observed, it becomes the graceful
1697    /// drain barrier.
1698    ///
1699    /// # Errors
1700    ///
1701    /// Returns an error if the disconnect request cannot be queued on the
1702    /// event loop.
1703    pub async fn disconnect(&self) -> Result<(), ClientError> {
1704        self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1705            .await
1706    }
1707
1708    /// Queues a graceful MQTT disconnect barrier with properties.
1709    ///
1710    /// Once the event loop observes this request, it stops processing later
1711    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1712    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1713    /// subscribe/unsubscribe requests to complete before sending MQTT
1714    /// `DISCONNECT`.
1715    ///
1716    /// This request uses the normal client request channel. Under publish
1717    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1718    /// that are not currently sendable; once observed, it becomes the graceful
1719    /// drain barrier.
1720    ///
1721    /// # Errors
1722    ///
1723    /// Returns an error if the disconnect request cannot be queued on the
1724    /// event loop.
1725    pub async fn disconnect_with_properties(
1726        &self,
1727        reason: DisconnectReasonCode,
1728        properties: DisconnectProperties,
1729    ) -> Result<(), ClientError> {
1730        self.handle_disconnect(reason, Some(properties)).await
1731    }
1732
1733    /// Queues a graceful MQTT disconnect barrier with a drain timeout.
1734    ///
1735    /// Once the event loop observes this request, it stops processing later
1736    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1737    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1738    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1739    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1740    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1741    /// `UNSUBACK`.
1742    ///
1743    /// If the drain completes before the deadline, the event loop sends and
1744    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1745    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1746    ///
1747    /// This request uses the normal client request channel. The timeout starts
1748    /// only after the event loop observes this request, not necessarily when
1749    /// this method queues it.
1750    ///
1751    /// # Errors
1752    ///
1753    /// Returns an error if the disconnect request cannot be queued on the
1754    /// event loop.
1755    pub async fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1756        self.handle_disconnect_with_timeout(
1757            DisconnectReasonCode::NormalDisconnection,
1758            None,
1759            timeout,
1760        )
1761        .await
1762    }
1763
1764    /// Queues a graceful MQTT disconnect barrier with properties and a drain timeout.
1765    ///
1766    /// Once the event loop observes this request, it stops processing later
1767    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1768    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1769    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1770    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1771    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1772    /// `UNSUBACK`.
1773    ///
1774    /// If the drain completes before the deadline, the event loop sends and
1775    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1776    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1777    ///
1778    /// This request uses the normal client request channel. The timeout starts
1779    /// only after the event loop observes this request, not necessarily when
1780    /// this method queues it.
1781    ///
1782    /// # Errors
1783    ///
1784    /// Returns an error if the disconnect request cannot be queued on the
1785    /// event loop.
1786    pub async fn disconnect_with_properties_timeout(
1787        &self,
1788        reason: DisconnectReasonCode,
1789        properties: DisconnectProperties,
1790        timeout: Duration,
1791    ) -> Result<(), ClientError> {
1792        self.handle_disconnect_with_timeout(reason, Some(properties), timeout)
1793            .await
1794    }
1795
1796    /// Sends a MQTT disconnect immediately without waiting for in-flight requests.
1797    ///
1798    /// This request uses a dedicated immediate shutdown channel, not the normal
1799    /// application request channel. It may bypass queued application work and
1800    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1801    ///
1802    /// # Errors
1803    ///
1804    /// Returns an error if the disconnect request cannot be queued on the
1805    /// event loop.
1806    pub async fn disconnect_now(&self) -> Result<(), ClientError> {
1807        self.handle_disconnect_now(DisconnectReasonCode::NormalDisconnection, None)
1808            .await
1809    }
1810
1811    /// Sends a MQTT disconnect with properties immediately without waiting for in-flight requests.
1812    ///
1813    /// This request uses a dedicated immediate shutdown channel, not the normal
1814    /// application request channel. It may bypass queued application work and
1815    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1816    ///
1817    /// # Errors
1818    ///
1819    /// Returns an error if the disconnect request cannot be queued on the
1820    /// event loop.
1821    pub async fn disconnect_now_with_properties(
1822        &self,
1823        reason: DisconnectReasonCode,
1824        properties: DisconnectProperties,
1825    ) -> Result<(), ClientError> {
1826        self.handle_disconnect_now(reason, Some(properties)).await
1827    }
1828
1829    // Handle disconnect interface which can have properties or not
1830    async fn handle_disconnect(
1831        &self,
1832        reason: DisconnectReasonCode,
1833        properties: Option<DisconnectProperties>,
1834    ) -> Result<(), ClientError> {
1835        let request = Self::build_disconnect_request(reason, properties);
1836        self.send_request_async(request).await?;
1837        Ok(())
1838    }
1839
1840    async fn handle_disconnect_with_timeout(
1841        &self,
1842        reason: DisconnectReasonCode,
1843        properties: Option<DisconnectProperties>,
1844        timeout: Duration,
1845    ) -> Result<(), ClientError> {
1846        let disconnect = Self::build_disconnect_packet(reason, properties);
1847        self.send_request_async(Request::DisconnectWithTimeout(disconnect, timeout))
1848            .await?;
1849        Ok(())
1850    }
1851
1852    async fn handle_disconnect_now(
1853        &self,
1854        reason: DisconnectReasonCode,
1855        properties: Option<DisconnectProperties>,
1856    ) -> Result<(), ClientError> {
1857        let disconnect = Self::build_disconnect_packet(reason, properties);
1858        self.send_immediate_disconnect_async(Request::DisconnectNow(disconnect))
1859            .await?;
1860        Ok(())
1861    }
1862
1863    /// Attempts to queue a graceful MQTT disconnect barrier with default
1864    /// `DisconnectReasonCode::NormalDisconnection`.
1865    ///
1866    /// Once the event loop observes this request, it stops processing later
1867    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1868    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1869    /// subscribe/unsubscribe requests to complete before sending MQTT
1870    /// `DISCONNECT`.
1871    ///
1872    /// This request uses the normal client request channel. Under publish
1873    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1874    /// that are not currently sendable; once observed, it becomes the graceful
1875    /// drain barrier.
1876    ///
1877    /// # Errors
1878    ///
1879    /// Returns an error if the disconnect request cannot be queued
1880    /// immediately on the event loop.
1881    pub fn try_disconnect(&self) -> Result<(), ClientError> {
1882        self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None)
1883    }
1884
1885    /// Attempts to queue a graceful MQTT disconnect barrier with properties.
1886    ///
1887    /// Once the event loop observes this request, it stops processing later
1888    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1889    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1890    /// subscribe/unsubscribe requests to complete before sending MQTT
1891    /// `DISCONNECT`.
1892    ///
1893    /// This request uses the normal client request channel. Under publish
1894    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1895    /// that are not currently sendable; once observed, it becomes the graceful
1896    /// drain barrier.
1897    ///
1898    /// # Errors
1899    ///
1900    /// Returns an error if the disconnect request cannot be queued
1901    /// immediately on the event loop.
1902    pub fn try_disconnect_with_properties(
1903        &self,
1904        reason: DisconnectReasonCode,
1905        properties: DisconnectProperties,
1906    ) -> Result<(), ClientError> {
1907        self.handle_try_disconnect(reason, Some(properties))
1908    }
1909
1910    /// Attempts to queue a graceful MQTT disconnect with a drain timeout.
1911    ///
1912    /// Once the event loop observes this request, it stops processing later
1913    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1914    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1915    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1916    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1917    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1918    /// `UNSUBACK`.
1919    ///
1920    /// If the drain completes before the deadline, the event loop sends and
1921    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1922    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1923    ///
1924    /// This request uses the normal client request channel. The timeout starts
1925    /// only after the event loop observes this request, not necessarily when
1926    /// this method queues it.
1927    ///
1928    /// # Errors
1929    ///
1930    /// Returns an error if the disconnect request cannot be queued
1931    /// immediately on the event loop.
1932    pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1933        self.handle_try_disconnect_with_timeout(
1934            DisconnectReasonCode::NormalDisconnection,
1935            None,
1936            timeout,
1937        )
1938    }
1939
1940    /// Attempts to queue a graceful MQTT disconnect with properties and a drain timeout.
1941    ///
1942    /// Once the event loop observes this request, it stops processing later
1943    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1944    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1945    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1946    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1947    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1948    /// `UNSUBACK`.
1949    ///
1950    /// If the drain completes before the deadline, the event loop sends and
1951    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1952    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1953    ///
1954    /// This request uses the normal client request channel. The timeout starts
1955    /// only after the event loop observes this request, not necessarily when
1956    /// this method queues it.
1957    ///
1958    /// # Errors
1959    ///
1960    /// Returns an error if the disconnect request cannot be queued
1961    /// immediately on the event loop.
1962    pub fn try_disconnect_with_properties_timeout(
1963        &self,
1964        reason: DisconnectReasonCode,
1965        properties: DisconnectProperties,
1966        timeout: Duration,
1967    ) -> Result<(), ClientError> {
1968        self.handle_try_disconnect_with_timeout(reason, Some(properties), timeout)
1969    }
1970
1971    /// Attempts to queue an immediate MQTT disconnect.
1972    ///
1973    /// This request uses a dedicated immediate shutdown channel, not the normal
1974    /// application request channel. It may bypass queued application work and
1975    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1976    ///
1977    /// # Errors
1978    ///
1979    /// Returns an error if the disconnect request cannot be queued
1980    /// immediately on the event loop.
1981    pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
1982        self.handle_try_disconnect_now(DisconnectReasonCode::NormalDisconnection, None)
1983    }
1984
1985    /// Attempts to queue an immediate MQTT disconnect with properties.
1986    ///
1987    /// This request uses a dedicated immediate shutdown channel, not the normal
1988    /// application request channel. It may bypass queued application work and
1989    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1990    ///
1991    /// # Errors
1992    ///
1993    /// Returns an error if the disconnect request cannot be queued
1994    /// immediately on the event loop.
1995    pub fn try_disconnect_now_with_properties(
1996        &self,
1997        reason: DisconnectReasonCode,
1998        properties: DisconnectProperties,
1999    ) -> Result<(), ClientError> {
2000        self.handle_try_disconnect_now(reason, Some(properties))
2001    }
2002
2003    // Handle disconnect interface which can have properties or not
2004    fn handle_try_disconnect(
2005        &self,
2006        reason: DisconnectReasonCode,
2007        properties: Option<DisconnectProperties>,
2008    ) -> Result<(), ClientError> {
2009        let request = Self::build_disconnect_request(reason, properties);
2010        self.try_send_request(request)?;
2011        Ok(())
2012    }
2013
2014    fn handle_try_disconnect_with_timeout(
2015        &self,
2016        reason: DisconnectReasonCode,
2017        properties: Option<DisconnectProperties>,
2018        timeout: Duration,
2019    ) -> Result<(), ClientError> {
2020        let disconnect = Self::build_disconnect_packet(reason, properties);
2021        self.try_send_request(Request::DisconnectWithTimeout(disconnect, timeout))?;
2022        Ok(())
2023    }
2024
2025    fn handle_try_disconnect_now(
2026        &self,
2027        reason: DisconnectReasonCode,
2028        properties: Option<DisconnectProperties>,
2029    ) -> Result<(), ClientError> {
2030        let disconnect = Self::build_disconnect_packet(reason, properties);
2031        self.try_send_immediate_disconnect(Request::DisconnectNow(disconnect))?;
2032        Ok(())
2033    }
2034
2035    // Helper function to build disconnect request
2036    fn build_disconnect_request(
2037        reason: DisconnectReasonCode,
2038        properties: Option<DisconnectProperties>,
2039    ) -> Request {
2040        Request::Disconnect(Self::build_disconnect_packet(reason, properties))
2041    }
2042
2043    fn build_disconnect_packet(
2044        reason: DisconnectReasonCode,
2045        properties: Option<DisconnectProperties>,
2046    ) -> Disconnect {
2047        properties.map_or_else(
2048            || Disconnect::new(reason),
2049            |p| Disconnect::new_with_properties(reason, p),
2050        )
2051    }
2052}
2053
2054const fn prepare_ack(publish: &Publish) -> Option<ManualAck> {
2055    let ack = match publish.qos {
2056        QoS::AtMostOnce => return None,
2057        QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)),
2058        QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)),
2059    };
2060    Some(ack)
2061}
2062
2063/// A synchronous client, communicates with MQTT `EventLoop`.
2064///
2065/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
2066/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
2067/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
2068///
2069/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
2070/// to send, receive and process packets from the broker, i.e. move ahead.
2071///
2072/// An asynchronous channel handle can also be extracted if necessary.
2073#[derive(Clone)]
2074pub struct Client {
2075    client: AsyncClient,
2076}
2077
2078impl Client {
2079    /// Create a builder for a [`Client`].
2080    ///
2081    /// The returned [`ClientBuilder`] only builds the synchronous client
2082    /// pair, which keeps the terminal `build()` method aligned with this
2083    /// entry point.
2084    #[must_use]
2085    pub const fn builder(options: MqttOptions) -> ClientBuilder {
2086        ClientBuilder::new(options)
2087    }
2088
2089    /// Create a new `Client` from a channel `Sender`.
2090    ///
2091    /// This is mostly useful for creating a test instance where you can
2092    /// listen on the corresponding receiver.
2093    #[must_use]
2094    pub const fn from_sender(request_tx: Sender<Request>) -> Self {
2095        Self {
2096            client: AsyncClient::from_senders(request_tx),
2097        }
2098    }
2099
2100    /// Sends a MQTT Publish to the `EventLoop`
2101    fn handle_publish<T, P>(
2102        &self,
2103        topic: T,
2104        qos: QoS,
2105        retain: bool,
2106        payload: P,
2107        properties: Option<PublishProperties>,
2108    ) -> Result<(), ClientError>
2109    where
2110        T: Into<PublishTopic>,
2111        P: Into<Bytes>,
2112    {
2113        let (topic, needs_validation) = topic.into().into_string_and_validation();
2114        let invalid_topic = (needs_validation && !valid_topic(&topic))
2115            || empty_topic_without_valid_alias(&topic, properties.as_ref());
2116        let mut publish = Publish::new(topic, qos, payload, properties);
2117        publish.retain = retain;
2118        let request = Request::Publish(publish);
2119
2120        if invalid_topic {
2121            return Err(ClientError::Request(Box::new(request)));
2122        }
2123
2124        self.client.send_request(request)?;
2125        Ok(())
2126    }
2127
2128    /// Sends a MQTT Publish with properties to the `EventLoop`.
2129    ///
2130    /// # Errors
2131    ///
2132    /// Returns an error if the topic or topic alias usage is invalid, or if
2133    /// the request cannot be queued on the event loop.
2134    pub fn publish_with_properties<T, P>(
2135        &self,
2136        topic: T,
2137        qos: QoS,
2138        retain: bool,
2139        payload: P,
2140        properties: PublishProperties,
2141    ) -> Result<(), ClientError>
2142    where
2143        T: Into<PublishTopic>,
2144        P: Into<Bytes>,
2145    {
2146        self.handle_publish(topic, qos, retain, payload, Some(properties))
2147    }
2148
2149    /// Sends a MQTT Publish to the `EventLoop`.
2150    ///
2151    /// # Errors
2152    ///
2153    /// Returns an error if the topic or topic alias usage is invalid, or if
2154    /// the request cannot be queued on the event loop.
2155    pub fn publish<T, P>(
2156        &self,
2157        topic: T,
2158        qos: QoS,
2159        retain: bool,
2160        payload: P,
2161    ) -> Result<(), ClientError>
2162    where
2163        T: Into<PublishTopic>,
2164        P: Into<Bytes>,
2165    {
2166        self.handle_publish(topic, qos, retain, payload, None)
2167    }
2168
2169    /// Attempts to send a MQTT Publish with properties to the `EventLoop`.
2170    ///
2171    /// # Errors
2172    ///
2173    /// Returns an error if the topic or topic alias usage is invalid, or if
2174    /// the request cannot be queued immediately on the event loop.
2175    pub fn try_publish_with_properties<T, P>(
2176        &self,
2177        topic: T,
2178        qos: QoS,
2179        retain: bool,
2180        payload: P,
2181        properties: PublishProperties,
2182    ) -> Result<(), ClientError>
2183    where
2184        T: Into<PublishTopic>,
2185        P: Into<Bytes>,
2186    {
2187        self.client
2188            .try_publish_with_properties(topic, qos, retain, payload, properties)
2189    }
2190
2191    /// Attempts to send a MQTT Publish to the `EventLoop`.
2192    ///
2193    /// # Errors
2194    ///
2195    /// Returns an error if the topic or topic alias usage is invalid, or if
2196    /// the request cannot be queued immediately on the event loop.
2197    pub fn try_publish<T, P>(
2198        &self,
2199        topic: T,
2200        qos: QoS,
2201        retain: bool,
2202        payload: P,
2203    ) -> Result<(), ClientError>
2204    where
2205        T: Into<PublishTopic>,
2206        P: Into<Bytes>,
2207    {
2208        self.client.try_publish(topic, qos, retain, payload)
2209    }
2210
2211    /// Prepares a MQTT PubAck/PubRec packet for manual acknowledgement.
2212    ///
2213    /// Returns `None` for `QoS0` publishes, which do not require acknowledgement.
2214    pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
2215        self.client.prepare_ack(publish)
2216    }
2217
2218    /// Sends a prepared MQTT PubAck/PubRec to the `EventLoop`.
2219    ///
2220    /// # Errors
2221    ///
2222    /// Returns an error if the acknowledgement cannot be queued on the event
2223    /// loop.
2224    pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
2225        self.client.send_request(ack.into_request())?;
2226        Ok(())
2227    }
2228
2229    /// Attempts to send a prepared MQTT PubAck/PubRec to the `EventLoop`.
2230    ///
2231    /// # Errors
2232    ///
2233    /// Returns an error if the acknowledgement cannot be queued immediately on
2234    /// the event loop.
2235    pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
2236        self.client.try_manual_ack(ack)?;
2237        Ok(())
2238    }
2239
2240    /// Sends a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
2241    /// Only needed if the `manual_acks` flag is set.
2242    ///
2243    /// # Errors
2244    ///
2245    /// Returns an error if the derived acknowledgement cannot be queued on the
2246    /// event loop.
2247    pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
2248        if let Some(ack) = self.prepare_ack(publish) {
2249            self.manual_ack(ack)?;
2250        }
2251        Ok(())
2252    }
2253
2254    /// Attempts to send a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
2255    /// Only needed if the `manual_acks` flag is set.
2256    ///
2257    /// # Errors
2258    ///
2259    /// Returns an error if the derived acknowledgement cannot be queued
2260    /// immediately on the event loop.
2261    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
2262        if let Some(ack) = self.prepare_ack(publish) {
2263            self.try_manual_ack(ack)?;
2264        }
2265        Ok(())
2266    }
2267
2268    /// Sends a MQTT AUTH packet to the `EventLoop`.
2269    ///
2270    /// # Errors
2271    ///
2272    /// Returns an error if the AUTH packet cannot be queued on the event loop.
2273    pub fn reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
2274        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2275        let auth = Request::Auth(auth);
2276        self.client.send_request(auth)?;
2277        Ok(())
2278    }
2279
2280    /// Sends a tracked MQTT re-authentication request to the `EventLoop`.
2281    ///
2282    /// # Errors
2283    ///
2284    /// Returns an error if the AUTH packet cannot be queued on the event loop
2285    /// or if tracked request notices are unavailable for this client instance.
2286    pub fn reauth_tracked(
2287        &self,
2288        properties: Option<AuthProperties>,
2289    ) -> Result<AuthNotice, ClientError> {
2290        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2291        self.client.send_tracked_auth(auth)
2292    }
2293
2294    /// Attempts to send a MQTT AUTH packet to the `EventLoop`.
2295    ///
2296    /// # Errors
2297    ///
2298    /// Returns an error if the AUTH packet cannot be queued immediately on the
2299    /// event loop.
2300    pub fn try_reauth(&self, properties: Option<AuthProperties>) -> Result<(), ClientError> {
2301        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2302        let auth = Request::Auth(auth);
2303        self.client.try_send_request(auth)?;
2304        Ok(())
2305    }
2306
2307    /// Attempts to send a tracked MQTT re-authentication request to the `EventLoop`.
2308    ///
2309    /// # Errors
2310    ///
2311    /// Returns an error if the AUTH packet cannot be queued immediately on the
2312    /// event loop or if tracked request notices are unavailable for this client
2313    /// instance.
2314    pub fn try_reauth_tracked(
2315        &self,
2316        properties: Option<AuthProperties>,
2317    ) -> Result<AuthNotice, ClientError> {
2318        let auth = Auth::new(AuthReasonCode::ReAuthenticate, properties);
2319        self.client.try_send_tracked_auth(auth)
2320    }
2321
2322    /// Sends a MQTT Subscribe to the `EventLoop`
2323    fn handle_subscribe<S: Into<String>>(
2324        &self,
2325        topic: S,
2326        qos: QoS,
2327        properties: Option<SubscribeProperties>,
2328    ) -> Result<(), ClientError> {
2329        let filter = Filter::new(topic, qos);
2330        let subscribe = Subscribe::new(filter, properties);
2331        if !subscribe_has_valid_filters(&subscribe) {
2332            return Err(ClientError::Request(Box::new(subscribe.into())));
2333        }
2334
2335        self.client.send_request(subscribe.into())?;
2336        Ok(())
2337    }
2338
2339    /// Sends a MQTT Subscribe with properties to the `EventLoop`.
2340    ///
2341    /// # Errors
2342    ///
2343    /// Returns an error if the topic filter is invalid or if the request
2344    /// cannot be queued on the event loop.
2345    pub fn subscribe_with_properties<S: Into<String>>(
2346        &self,
2347        topic: S,
2348        qos: QoS,
2349        properties: SubscribeProperties,
2350    ) -> Result<(), ClientError> {
2351        self.handle_subscribe(topic, qos, Some(properties))
2352    }
2353
2354    /// Sends a MQTT Subscribe to the `EventLoop`.
2355    ///
2356    /// # Errors
2357    ///
2358    /// Returns an error if the topic filter is invalid or if the request
2359    /// cannot be queued on the event loop.
2360    pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
2361        self.handle_subscribe(topic, qos, None)
2362    }
2363
2364    /// Attempts to send a MQTT Subscribe with properties to the `EventLoop`.
2365    ///
2366    /// # Errors
2367    ///
2368    /// Returns an error if the topic filter is invalid or if the request
2369    /// cannot be queued immediately on the event loop.
2370    pub fn try_subscribe_with_properties<S: Into<String>>(
2371        &self,
2372        topic: S,
2373        qos: QoS,
2374        properties: SubscribeProperties,
2375    ) -> Result<(), ClientError> {
2376        self.client
2377            .try_subscribe_with_properties(topic, qos, properties)
2378    }
2379
2380    /// Attempts to send a MQTT Subscribe to the `EventLoop`.
2381    ///
2382    /// # Errors
2383    ///
2384    /// Returns an error if the topic filter is invalid or if the request
2385    /// cannot be queued immediately on the event loop.
2386    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
2387        self.client.try_subscribe(topic, qos)
2388    }
2389
2390    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
2391    fn handle_subscribe_many<T>(
2392        &self,
2393        topics: T,
2394        properties: Option<SubscribeProperties>,
2395    ) -> Result<(), ClientError>
2396    where
2397        T: IntoIterator<Item = Filter>,
2398    {
2399        let subscribe = Subscribe::new_many(topics, properties);
2400        if !subscribe_has_valid_filters(&subscribe) {
2401            return Err(ClientError::Request(Box::new(subscribe.into())));
2402        }
2403
2404        self.client.send_request(subscribe.into())?;
2405        Ok(())
2406    }
2407
2408    /// Sends a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
2409    ///
2410    /// # Errors
2411    ///
2412    /// Returns an error if the filter list is invalid or if the request cannot
2413    /// be queued on the event loop.
2414    pub fn subscribe_many_with_properties<T>(
2415        &self,
2416        topics: T,
2417        properties: SubscribeProperties,
2418    ) -> Result<(), ClientError>
2419    where
2420        T: IntoIterator<Item = Filter>,
2421    {
2422        self.handle_subscribe_many(topics, Some(properties))
2423    }
2424
2425    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`.
2426    ///
2427    /// # Errors
2428    ///
2429    /// Returns an error if the filter list is invalid or if the request cannot
2430    /// be queued on the event loop.
2431    pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
2432    where
2433        T: IntoIterator<Item = Filter>,
2434    {
2435        self.handle_subscribe_many(topics, None)
2436    }
2437
2438    /// Attempts to send a MQTT Subscribe for multiple topics with properties to the `EventLoop`.
2439    ///
2440    /// # Errors
2441    ///
2442    /// Returns an error if the filter list is invalid or if the request cannot
2443    /// be queued immediately on the event loop.
2444    pub fn try_subscribe_many_with_properties<T>(
2445        &self,
2446        topics: T,
2447        properties: SubscribeProperties,
2448    ) -> Result<(), ClientError>
2449    where
2450        T: IntoIterator<Item = Filter>,
2451    {
2452        self.client
2453            .try_subscribe_many_with_properties(topics, properties)
2454    }
2455
2456    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`.
2457    ///
2458    /// # Errors
2459    ///
2460    /// Returns an error if the filter list is invalid or if the request cannot
2461    /// be queued immediately on the event loop.
2462    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
2463    where
2464        T: IntoIterator<Item = Filter>,
2465    {
2466        self.client.try_subscribe_many(topics)
2467    }
2468
2469    /// Sends a MQTT Unsubscribe to the `EventLoop`
2470    fn handle_unsubscribe<S: Into<String>>(
2471        &self,
2472        topic: S,
2473        properties: Option<UnsubscribeProperties>,
2474    ) -> Result<(), ClientError> {
2475        let unsubscribe = Unsubscribe::new(topic, properties);
2476        let request = Request::Unsubscribe(unsubscribe);
2477        self.client.send_request(request)?;
2478        Ok(())
2479    }
2480
2481    /// Sends a MQTT Unsubscribe with properties to the `EventLoop`.
2482    ///
2483    /// # Errors
2484    ///
2485    /// Returns an error if the request cannot be queued on the event loop.
2486    pub fn unsubscribe_with_properties<S: Into<String>>(
2487        &self,
2488        topic: S,
2489        properties: UnsubscribeProperties,
2490    ) -> Result<(), ClientError> {
2491        self.handle_unsubscribe(topic, Some(properties))
2492    }
2493
2494    /// Sends a MQTT Unsubscribe to the `EventLoop`.
2495    ///
2496    /// # Errors
2497    ///
2498    /// Returns an error if the request cannot be queued on the event loop.
2499    pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
2500        self.handle_unsubscribe(topic, None)
2501    }
2502
2503    /// Attempts to send a MQTT Unsubscribe with properties to the `EventLoop`.
2504    ///
2505    /// # Errors
2506    ///
2507    /// Returns an error if the request cannot be queued immediately on the
2508    /// event loop.
2509    pub fn try_unsubscribe_with_properties<S: Into<String>>(
2510        &self,
2511        topic: S,
2512        properties: UnsubscribeProperties,
2513    ) -> Result<(), ClientError> {
2514        self.client
2515            .try_unsubscribe_with_properties(topic, properties)
2516    }
2517
2518    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`.
2519    ///
2520    /// # Errors
2521    ///
2522    /// Returns an error if the request cannot be queued immediately on the
2523    /// event loop.
2524    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
2525        self.client.try_unsubscribe(topic)
2526    }
2527
2528    /// Queues a graceful MQTT disconnect barrier.
2529    ///
2530    /// Once the event loop observes this request, it stops processing later
2531    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2532    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
2533    /// subscribe/unsubscribe requests to complete before sending MQTT
2534    /// `DISCONNECT`.
2535    ///
2536    /// This request uses the normal client request channel. Under publish
2537    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
2538    /// that are not currently sendable; once observed, it becomes the graceful
2539    /// drain barrier.
2540    ///
2541    /// # Errors
2542    ///
2543    /// Returns an error if the disconnect request cannot be queued on the
2544    /// event loop.
2545    pub fn disconnect(&self) -> Result<(), ClientError> {
2546        self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
2547    }
2548
2549    /// Queues a graceful MQTT disconnect barrier with properties.
2550    ///
2551    /// Once the event loop observes this request, it stops processing later
2552    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2553    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
2554    /// subscribe/unsubscribe requests to complete before sending MQTT
2555    /// `DISCONNECT`.
2556    ///
2557    /// This request uses the normal client request channel. Under publish
2558    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
2559    /// that are not currently sendable; once observed, it becomes the graceful
2560    /// drain barrier.
2561    ///
2562    /// # Errors
2563    ///
2564    /// Returns an error if the disconnect request cannot be queued on the
2565    /// event loop.
2566    pub fn disconnect_with_properties(
2567        &self,
2568        reason: DisconnectReasonCode,
2569        properties: DisconnectProperties,
2570    ) -> Result<(), ClientError> {
2571        self.handle_disconnect(reason, Some(properties))
2572    }
2573
2574    /// Queues a graceful MQTT disconnect barrier with a drain timeout.
2575    ///
2576    /// Once the event loop observes this request, it stops processing later
2577    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2578    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
2579    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
2580    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
2581    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
2582    /// `UNSUBACK`.
2583    ///
2584    /// If the drain completes before the deadline, the event loop sends and
2585    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
2586    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
2587    ///
2588    /// This request uses the normal client request channel. The timeout starts
2589    /// only after the event loop observes this request, not necessarily when
2590    /// this method queues it.
2591    ///
2592    /// # Errors
2593    ///
2594    /// Returns an error if the disconnect request cannot be queued on the
2595    /// event loop.
2596    pub fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
2597        self.handle_disconnect_with_timeout(
2598            DisconnectReasonCode::NormalDisconnection,
2599            None,
2600            timeout,
2601        )
2602    }
2603
2604    /// Queues a graceful MQTT disconnect barrier with properties and a drain timeout.
2605    ///
2606    /// Once the event loop observes this request, it stops processing later
2607    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2608    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
2609    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
2610    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
2611    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
2612    /// `UNSUBACK`.
2613    ///
2614    /// If the drain completes before the deadline, the event loop sends and
2615    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
2616    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
2617    ///
2618    /// This request uses the normal client request channel. The timeout starts
2619    /// only after the event loop observes this request, not necessarily when
2620    /// this method queues it.
2621    ///
2622    /// # Errors
2623    ///
2624    /// Returns an error if the disconnect request cannot be queued on the
2625    /// event loop.
2626    pub fn disconnect_with_properties_timeout(
2627        &self,
2628        reason: DisconnectReasonCode,
2629        properties: DisconnectProperties,
2630        timeout: Duration,
2631    ) -> Result<(), ClientError> {
2632        self.handle_disconnect_with_timeout(reason, Some(properties), timeout)
2633    }
2634
2635    /// Sends a MQTT disconnect immediately without waiting for in-flight requests.
2636    ///
2637    /// This request uses a dedicated immediate shutdown channel, not the normal
2638    /// application request channel. It may bypass queued application work and
2639    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
2640    ///
2641    /// # Errors
2642    ///
2643    /// Returns an error if the disconnect request cannot be queued on the
2644    /// event loop.
2645    pub fn disconnect_now(&self) -> Result<(), ClientError> {
2646        self.handle_disconnect_now(DisconnectReasonCode::NormalDisconnection, None)
2647    }
2648
2649    /// Sends a MQTT disconnect with properties immediately without waiting for in-flight requests.
2650    ///
2651    /// This request uses a dedicated immediate shutdown channel, not the normal
2652    /// application request channel. It may bypass queued application work and
2653    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
2654    ///
2655    /// # Errors
2656    ///
2657    /// Returns an error if the disconnect request cannot be queued on the
2658    /// event loop.
2659    pub fn disconnect_now_with_properties(
2660        &self,
2661        reason: DisconnectReasonCode,
2662        properties: DisconnectProperties,
2663    ) -> Result<(), ClientError> {
2664        self.handle_disconnect_now(reason, Some(properties))
2665    }
2666
2667    fn handle_disconnect(
2668        &self,
2669        reason: DisconnectReasonCode,
2670        properties: Option<DisconnectProperties>,
2671    ) -> Result<(), ClientError> {
2672        let request = AsyncClient::build_disconnect_request(reason, properties);
2673        self.client.send_request(request)?;
2674        Ok(())
2675    }
2676
2677    fn handle_disconnect_with_timeout(
2678        &self,
2679        reason: DisconnectReasonCode,
2680        properties: Option<DisconnectProperties>,
2681        timeout: Duration,
2682    ) -> Result<(), ClientError> {
2683        let disconnect = AsyncClient::build_disconnect_packet(reason, properties);
2684        self.client
2685            .send_request(Request::DisconnectWithTimeout(disconnect, timeout))?;
2686        Ok(())
2687    }
2688
2689    fn handle_disconnect_now(
2690        &self,
2691        reason: DisconnectReasonCode,
2692        properties: Option<DisconnectProperties>,
2693    ) -> Result<(), ClientError> {
2694        let disconnect = AsyncClient::build_disconnect_packet(reason, properties);
2695        self.client
2696            .send_immediate_disconnect(Request::DisconnectNow(disconnect))?;
2697        Ok(())
2698    }
2699
2700    /// Attempts to queue a graceful MQTT disconnect barrier.
2701    ///
2702    /// Once the event loop observes this request, it stops processing later
2703    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2704    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
2705    /// subscribe/unsubscribe requests to complete before sending MQTT
2706    /// `DISCONNECT`.
2707    ///
2708    /// This request uses the normal client request channel. Under publish
2709    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
2710    /// that are not currently sendable; once observed, it becomes the graceful
2711    /// drain barrier.
2712    ///
2713    /// # Errors
2714    ///
2715    /// Returns an error if the disconnect request cannot be queued
2716    /// immediately on the event loop.
2717    pub fn try_disconnect(&self) -> Result<(), ClientError> {
2718        self.client.try_disconnect()
2719    }
2720
2721    /// Attempts to queue a graceful MQTT disconnect barrier with properties.
2722    ///
2723    /// Once the event loop observes this request, it stops processing later
2724    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2725    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
2726    /// subscribe/unsubscribe requests to complete before sending MQTT
2727    /// `DISCONNECT`.
2728    ///
2729    /// This request uses the normal client request channel. Under publish
2730    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
2731    /// that are not currently sendable; once observed, it becomes the graceful
2732    /// drain barrier.
2733    ///
2734    /// # Errors
2735    ///
2736    /// Returns an error if the disconnect request cannot be queued
2737    /// immediately on the event loop.
2738    pub fn try_disconnect_with_properties(
2739        &self,
2740        reason: DisconnectReasonCode,
2741        properties: DisconnectProperties,
2742    ) -> Result<(), ClientError> {
2743        self.client.handle_try_disconnect(reason, Some(properties))
2744    }
2745
2746    /// Attempts to queue a graceful MQTT disconnect with a drain timeout.
2747    ///
2748    /// Once the event loop observes this request, it stops processing later
2749    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2750    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
2751    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
2752    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
2753    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
2754    /// `UNSUBACK`.
2755    ///
2756    /// If the drain completes before the deadline, the event loop sends and
2757    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
2758    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
2759    ///
2760    /// This request uses the normal client request channel. The timeout starts
2761    /// only after the event loop observes this request, not necessarily when
2762    /// this method queues it.
2763    ///
2764    /// # Errors
2765    ///
2766    /// Returns an error if the disconnect request cannot be queued
2767    /// immediately on the event loop.
2768    pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
2769        self.client.try_disconnect_with_timeout(timeout)
2770    }
2771
2772    /// Attempts to queue a graceful MQTT disconnect with properties and a drain timeout.
2773    ///
2774    /// Once the event loop observes this request, it stops processing later
2775    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
2776    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
2777    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
2778    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
2779    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
2780    /// `UNSUBACK`.
2781    ///
2782    /// If the drain completes before the deadline, the event loop sends and
2783    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
2784    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
2785    ///
2786    /// This request uses the normal client request channel. The timeout starts
2787    /// only after the event loop observes this request, not necessarily when
2788    /// this method queues it.
2789    ///
2790    /// # Errors
2791    ///
2792    /// Returns an error if the disconnect request cannot be queued
2793    /// immediately on the event loop.
2794    pub fn try_disconnect_with_properties_timeout(
2795        &self,
2796        reason: DisconnectReasonCode,
2797        properties: DisconnectProperties,
2798        timeout: Duration,
2799    ) -> Result<(), ClientError> {
2800        self.client
2801            .handle_try_disconnect_with_timeout(reason, Some(properties), timeout)
2802    }
2803
2804    /// Attempts to queue an immediate MQTT disconnect.
2805    ///
2806    /// This request uses a dedicated immediate shutdown channel, not the normal
2807    /// application request channel. It may bypass queued application work and
2808    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
2809    ///
2810    /// # Errors
2811    ///
2812    /// Returns an error if the disconnect request cannot be queued
2813    /// immediately on the event loop.
2814    pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
2815        self.client.try_disconnect_now()
2816    }
2817
2818    /// Attempts to queue an immediate MQTT disconnect with properties.
2819    ///
2820    /// This request uses a dedicated immediate shutdown channel, not the normal
2821    /// application request channel. It may bypass queued application work and
2822    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
2823    ///
2824    /// # Errors
2825    ///
2826    /// Returns an error if the disconnect request cannot be queued
2827    /// immediately on the event loop.
2828    pub fn try_disconnect_now_with_properties(
2829        &self,
2830        reason: DisconnectReasonCode,
2831        properties: DisconnectProperties,
2832    ) -> Result<(), ClientError> {
2833        self.client
2834            .handle_try_disconnect_now(reason, Some(properties))
2835    }
2836}
2837
2838#[must_use]
2839fn empty_topic_without_valid_alias(topic: &str, properties: Option<&PublishProperties>) -> bool {
2840    topic.is_empty()
2841        && properties
2842            .and_then(|props| props.topic_alias)
2843            .unwrap_or_default()
2844            == 0
2845}
2846
2847#[must_use]
2848fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
2849    !subscribe.filters.is_empty()
2850        && subscribe
2851            .filters
2852            .iter()
2853            .all(|filter| valid_filter(&filter.path))
2854}
2855
2856/// Error type returned by [`Connection::recv`]
2857#[derive(Debug, Eq, PartialEq)]
2858pub struct RecvError;
2859
2860/// Error type returned by [`Connection::try_recv`]
2861#[derive(Debug, Eq, PartialEq)]
2862pub enum TryRecvError {
2863    /// User has closed requests channel
2864    Disconnected,
2865    /// Did not resolve
2866    Empty,
2867}
2868
2869/// Error type returned by [`Connection::recv_timeout`]
2870#[derive(Debug, Eq, PartialEq)]
2871pub enum RecvTimeoutError {
2872    /// User has closed requests channel
2873    Disconnected,
2874    /// Recv request timedout
2875    Timeout,
2876}
2877
2878///  MQTT connection. Maintains all the necessary state
2879pub struct Connection {
2880    pub eventloop: EventLoop,
2881    runtime: Runtime,
2882}
2883impl Connection {
2884    const fn new(eventloop: EventLoop, runtime: Runtime) -> Self {
2885        Self { eventloop, runtime }
2886    }
2887
2888    /// Returns an iterator over this connection. Iterating over this is all that's
2889    /// necessary to make connection progress and maintain a robust connection.
2890    /// Just continuing to loop will reconnect
2891    /// **NOTE** Don't block this while iterating
2892    // ideally this should be named iter_mut because it requires a mutable reference
2893    // Also we can implement IntoIter for this to make it easy to iterate over it
2894    #[must_use = "Connection should be iterated over a loop to make progress"]
2895    pub const fn iter(&mut self) -> Iter<'_> {
2896        Iter { connection: self }
2897    }
2898
2899    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
2900    /// if all clients/users have closed requests channel.
2901    ///
2902    /// [`EventLoop`]: super::EventLoop
2903    ///
2904    /// # Errors
2905    ///
2906    /// Returns [`RecvError`] if all request senders have been dropped.
2907    pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
2908        let f = self.eventloop.poll();
2909        let event = self.runtime.block_on(f);
2910
2911        resolve_event(event).ok_or(RecvError)
2912    }
2913
2914    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
2915    /// if none immediately present or all clients/users have closed requests channel.
2916    ///
2917    /// [`EventLoop`]: super::EventLoop
2918    ///
2919    /// # Errors
2920    ///
2921    /// Returns [`TryRecvError::Empty`] if no event is immediately ready, or
2922    /// [`TryRecvError::Disconnected`] if all request senders have been dropped.
2923    pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
2924        let f = self.eventloop.poll();
2925        // Enters the runtime context so we can poll the future, as required by `now_or_never()`.
2926        // ref: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.enter
2927        let _guard = self.runtime.enter();
2928        let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
2929
2930        resolve_event(event).ok_or(TryRecvError::Disconnected)
2931    }
2932
2933    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
2934    /// if all clients/users have closed requests channel or the timeout has expired.
2935    ///
2936    /// [`EventLoop`]: super::EventLoop
2937    ///
2938    /// # Errors
2939    ///
2940    /// Returns [`RecvTimeoutError::Timeout`] if no event arrives before
2941    /// `duration`, or [`RecvTimeoutError::Disconnected`] if all request
2942    /// senders have been dropped.
2943    pub fn recv_timeout(
2944        &mut self,
2945        duration: Duration,
2946    ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
2947        let f = self.eventloop.poll();
2948        let event = self
2949            .runtime
2950            .block_on(async { timeout(duration, f).await })
2951            .map_err(|_| RecvTimeoutError::Timeout)?;
2952
2953        resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
2954    }
2955}
2956
2957fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
2958    match event {
2959        Ok(v) => Some(Ok(v)),
2960        // closing of request channel should stop the iterator
2961        Err(ConnectionError::RequestsDone) => {
2962            trace!("Done with requests");
2963            None
2964        }
2965        Err(e) => Some(Err(e)),
2966    }
2967}
2968
2969/// Iterator which polls the `EventLoop` for connection progress
2970pub struct Iter<'a> {
2971    connection: &'a mut Connection,
2972}
2973
2974impl Iterator for Iter<'_> {
2975    type Item = Result<Event, ConnectionError>;
2976
2977    fn next(&mut self) -> Option<Self::Item> {
2978        self.connection.recv().ok()
2979    }
2980}
2981
2982#[cfg(test)]
2983mod test {
2984    use crate::mqttbytes::v5::{
2985        LastWill, PubAckProperties, PubAckReason, PubRecProperties, PubRecReason,
2986    };
2987
2988    use super::*;
2989
2990    #[test]
2991    fn calling_iter_twice_on_connection_shouldnt_panic() {
2992        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
2993        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
2994        mqttoptions.set_keep_alive(5).set_last_will(will);
2995
2996        let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
2997        let _ = connection.iter();
2998        let _ = connection.iter();
2999    }
3000
3001    #[test]
3002    fn builder_uses_options_request_channel_capacity_by_default() {
3003        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
3004        mqttoptions.set_request_channel_capacity(1);
3005        let builder: AsyncClientBuilder = AsyncClient::builder(mqttoptions);
3006        let (client, _eventloop) = builder.build();
3007
3008        client
3009            .try_publish("hello/world", QoS::AtMostOnce, false, "one")
3010            .expect("first request should fit configured capacity");
3011        assert!(matches!(
3012            client.try_publish("hello/world", QoS::AtMostOnce, false, "two"),
3013            Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
3014        ));
3015    }
3016
3017    #[test]
3018    fn sync_and_async_entry_points_return_distinct_builder_types() {
3019        let sync_builder = Client::builder(MqttOptions::new("test-sync", "localhost"));
3020        let async_builder = AsyncClient::builder(MqttOptions::new("test-async", "localhost"));
3021
3022        let _: ClientBuilder = sync_builder;
3023        let _: AsyncClientBuilder = async_builder;
3024    }
3025
3026    #[test]
3027    fn builder_capacity_overrides_options_request_channel_capacity() {
3028        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
3029        mqttoptions.set_request_channel_capacity(1);
3030        let (client, _eventloop) = Client::builder(mqttoptions).capacity(2).build();
3031
3032        client
3033            .try_publish("hello/world", QoS::AtMostOnce, false, "one")
3034            .expect("first request should fit overridden capacity");
3035        client
3036            .try_publish("hello/world", QoS::AtMostOnce, false, "two")
3037            .expect("second request should fit overridden capacity");
3038        assert!(matches!(
3039            client.try_publish("hello/world", QoS::AtMostOnce, false, "three"),
3040            Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
3041        ));
3042    }
3043
3044    #[test]
3045    fn builder_capacity_zero_is_bounded_rendezvous() {
3046        let mqttoptions = MqttOptions::new("test-1", "localhost");
3047        let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(0).build();
3048
3049        assert!(matches!(
3050            client.try_publish("hello/world", QoS::AtMostOnce, false, "one"),
3051            Err(ClientError::TryRequest(request)) if matches!(*request, Request::Publish(_))
3052        ));
3053    }
3054
3055    #[test]
3056    fn unbounded_builder_allows_try_publish_without_polling() {
3057        let mqttoptions = MqttOptions::new("test-1", "localhost");
3058        let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
3059
3060        for i in 0..128 {
3061            client
3062                .try_publish("hello/world", QoS::AtMostOnce, false, vec![i])
3063                .expect("unbounded channel should accept requests without polling");
3064        }
3065    }
3066
3067    #[tokio::test]
3068    async fn bounded_publish_blocks_when_channel_is_full_without_polling() {
3069        let mqttoptions = MqttOptions::new("test-1", "localhost");
3070        let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(1).build();
3071
3072        client
3073            .publish("hello/world", QoS::AtMostOnce, false, "one")
3074            .await
3075            .expect("first request should fit bounded channel");
3076
3077        let result = tokio::time::timeout(
3078            std::time::Duration::from_millis(25),
3079            client.publish("hello/world", QoS::AtMostOnce, false, "two"),
3080        )
3081        .await;
3082        assert!(result.is_err());
3083    }
3084
3085    #[tokio::test]
3086    async fn unbounded_publish_completes_without_polling() {
3087        let mqttoptions = MqttOptions::new("test-1", "localhost");
3088        let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
3089
3090        for i in 0..128 {
3091            client
3092                .publish("hello/world", QoS::AtMostOnce, false, vec![i])
3093                .await
3094                .expect("unbounded channel should accept requests without polling");
3095        }
3096    }
3097
3098    #[test]
3099    fn should_be_able_to_build_test_client_from_channel() {
3100        let (tx, rx) = flume::bounded(1);
3101        let client = Client::from_sender(tx);
3102        client
3103            .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
3104            .expect("Should be able to publish");
3105        let _ = rx.try_recv().expect("Should have message");
3106    }
3107
3108    #[test]
3109    fn prepare_ack_maps_qos_to_manual_ack_packets_v5() {
3110        let (tx, _) = flume::bounded(1);
3111        let client = Client::from_sender(tx);
3112
3113        let qos0 = Publish::new("hello/world", QoS::AtMostOnce, vec![1], None);
3114        assert!(client.prepare_ack(&qos0).is_none());
3115
3116        let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
3117        qos1.pkid = 7;
3118        match client.prepare_ack(&qos1) {
3119            Some(ManualAck::PubAck(ack)) => assert_eq!(ack.pkid, 7),
3120            ack => panic!("expected QoS1 PubAck, got {ack:?}"),
3121        }
3122
3123        let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
3124        qos2.pkid = 9;
3125        match client.prepare_ack(&qos2) {
3126            Some(ManualAck::PubRec(ack)) => assert_eq!(ack.pkid, 9),
3127            ack => panic!("expected QoS2 PubRec, got {ack:?}"),
3128        }
3129    }
3130
3131    #[test]
3132    fn manual_ack_sends_custom_puback_reason_and_properties() {
3133        let (tx, rx) = flume::bounded(1);
3134        let client = Client::from_sender(tx);
3135
3136        let expected_properties = PubAckProperties {
3137            reason_string: Some("no downstream subscribers".to_owned()),
3138            user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
3139        };
3140        let mut ack = PubAck::new(41, None);
3141        ack.reason = PubAckReason::NoMatchingSubscribers;
3142        ack.properties = Some(expected_properties.clone());
3143
3144        client
3145            .manual_ack(ManualAck::PubAck(ack))
3146            .expect("manual_ack should send request");
3147
3148        let request = rx.try_recv().expect("Should have ack request");
3149        match request {
3150            Request::PubAck(ack) => {
3151                assert_eq!(ack.pkid, 41);
3152                assert_eq!(ack.reason, PubAckReason::NoMatchingSubscribers);
3153                assert_eq!(ack.properties, Some(expected_properties));
3154            }
3155            request => panic!("Expected PubAck request, got {request:?}"),
3156        }
3157    }
3158
3159    #[test]
3160    fn try_manual_ack_sends_custom_pubrec_reason_and_properties() {
3161        let (tx, rx) = flume::bounded(1);
3162        let client = Client::from_sender(tx);
3163
3164        let expected_properties = PubRecProperties {
3165            reason_string: Some("queued for qos2 flow".to_owned()),
3166            user_properties: vec![("source".to_owned(), "unit-test".to_owned())],
3167        };
3168        let mut ack = PubRec::new(52, None);
3169        ack.reason = PubRecReason::ImplementationSpecificError;
3170        ack.properties = Some(expected_properties.clone());
3171
3172        client
3173            .try_manual_ack(ManualAck::PubRec(ack))
3174            .expect("try_manual_ack should send request");
3175
3176        let request = rx.try_recv().expect("Should have ack request");
3177        match request {
3178            Request::PubRec(ack) => {
3179                assert_eq!(ack.pkid, 52);
3180                assert_eq!(ack.reason, PubRecReason::ImplementationSpecificError);
3181                assert_eq!(ack.properties, Some(expected_properties));
3182            }
3183            request => panic!("Expected PubRec request, got {request:?}"),
3184        }
3185    }
3186
3187    #[test]
3188    fn ack_and_try_ack_send_default_success_packets_v5() {
3189        let (tx, rx) = flume::bounded(2);
3190        let client = Client::from_sender(tx);
3191
3192        let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1], None);
3193        qos1.pkid = 11;
3194        client.ack(&qos1).expect("ack should send PubAck");
3195
3196        let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1], None);
3197        qos2.pkid = 13;
3198        client
3199            .try_ack(&qos2)
3200            .expect("try_ack should send PubRec request");
3201
3202        let first = rx.try_recv().expect("Should receive first ack request");
3203        match first {
3204            Request::PubAck(ack) => {
3205                assert_eq!(ack.pkid, 11);
3206                assert_eq!(ack.reason, PubAckReason::Success);
3207                assert_eq!(ack.properties, None);
3208            }
3209            request => panic!("Expected PubAck request, got {request:?}"),
3210        }
3211
3212        let second = rx.try_recv().expect("Should receive second ack request");
3213        match second {
3214            Request::PubRec(ack) => {
3215                assert_eq!(ack.pkid, 13);
3216                assert_eq!(ack.reason, PubRecReason::Success);
3217                assert_eq!(ack.properties, None);
3218            }
3219            request => panic!("Expected PubRec request, got {request:?}"),
3220        }
3221    }
3222
3223    #[test]
3224    fn test_reauth() {
3225        let (client, mut connection) = Client::builder(MqttOptions::new("test-1", "localhost"))
3226            .capacity(10)
3227            .build();
3228        let props = AuthProperties {
3229            method: Some("test".to_string()),
3230            data: Some(Bytes::from("test")),
3231            reason: None,
3232            user_properties: vec![],
3233        };
3234        client
3235            .reauth(Some(props.clone()))
3236            .expect("Should be able to reauth");
3237        let _ = connection.iter().next().expect("Should have event");
3238
3239        client
3240            .try_reauth(Some(props))
3241            .expect("Should be able to reauth");
3242        let _ = connection.iter().next().expect("Should have event");
3243    }
3244
3245    #[test]
3246    fn can_publish_with_validated_topic() {
3247        let (tx, rx) = flume::bounded(1);
3248        let client = Client::from_sender(tx);
3249        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3250        client
3251            .publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
3252            .expect("Should be able to publish");
3253        let _ = rx.try_recv().expect("Should have message");
3254    }
3255
3256    #[test]
3257    fn publish_accepts_borrowed_string_topic() {
3258        let (tx, rx) = flume::bounded(2);
3259        let client = Client::from_sender(tx);
3260        let topic = "hello/world".to_string();
3261        client
3262            .publish(&topic, QoS::ExactlyOnce, false, "good bye")
3263            .expect("Should be able to publish");
3264        client
3265            .try_publish(&topic, QoS::ExactlyOnce, false, "good bye")
3266            .expect("Should be able to publish");
3267        let _ = rx.try_recv().expect("Should have message");
3268        let _ = rx.try_recv().expect("Should have message");
3269    }
3270
3271    #[test]
3272    fn publish_accepts_cow_topic_variants() {
3273        let (tx, rx) = flume::bounded(2);
3274        let client = Client::from_sender(tx);
3275        client
3276            .publish(
3277                std::borrow::Cow::Borrowed("hello/world"),
3278                QoS::ExactlyOnce,
3279                false,
3280                "good bye",
3281            )
3282            .expect("Should be able to publish");
3283        client
3284            .try_publish(
3285                std::borrow::Cow::Owned("hello/world".to_owned()),
3286                QoS::ExactlyOnce,
3287                false,
3288                "good bye",
3289            )
3290            .expect("Should be able to publish");
3291        let _ = rx.try_recv().expect("Should have message");
3292        let _ = rx.try_recv().expect("Should have message");
3293    }
3294
3295    #[test]
3296    fn publishing_invalid_cow_topic_fails() {
3297        let (tx, _) = flume::bounded(1);
3298        let client = Client::from_sender(tx);
3299        let err = client
3300            .publish(
3301                std::borrow::Cow::Borrowed("a/+/b"),
3302                QoS::ExactlyOnce,
3303                false,
3304                "good bye",
3305            )
3306            .expect_err("Invalid publish topic should fail");
3307        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3308    }
3309
3310    #[test]
3311    fn validated_topic_ergonomics() {
3312        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3313        let valid_topic_can_be_cloned = valid_topic.clone();
3314        // ValidatedTopic can be compared
3315        assert_eq!(valid_topic, valid_topic_can_be_cloned);
3316    }
3317
3318    #[test]
3319    fn creating_invalid_validated_topic_fails() {
3320        assert_eq!(
3321            ValidatedTopic::new("a/+/b"),
3322            Err(InvalidTopic("a/+/b".to_string()))
3323        );
3324    }
3325
3326    #[test]
3327    fn publish_with_properties_accepts_validated_topic() {
3328        let (tx, rx) = flume::bounded(1);
3329        let client = Client::from_sender(tx);
3330        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3331        client
3332            .publish_with_properties(
3333                valid_topic,
3334                QoS::ExactlyOnce,
3335                false,
3336                "good bye",
3337                PublishProperties::default(),
3338            )
3339            .expect("Should be able to publish");
3340        let _ = rx.try_recv().expect("Should have message");
3341    }
3342
3343    #[test]
3344    fn publish_with_properties_empty_topic_requires_nonzero_alias() {
3345        let (tx, _) = flume::bounded(1);
3346        let client = Client::from_sender(tx);
3347
3348        let err = client
3349            .publish_with_properties(
3350                "",
3351                QoS::AtMostOnce,
3352                false,
3353                "good bye",
3354                PublishProperties::default(),
3355            )
3356            .expect_err("Empty topic without topic alias should fail");
3357        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3358
3359        let err = client
3360            .publish_with_properties(
3361                "",
3362                QoS::AtMostOnce,
3363                false,
3364                "good bye",
3365                PublishProperties {
3366                    topic_alias: Some(0),
3367                    ..Default::default()
3368                },
3369            )
3370            .expect_err("Empty topic with topic alias 0 should fail");
3371        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3372    }
3373
3374    #[test]
3375    fn publish_with_properties_empty_topic_accepts_nonzero_alias() {
3376        let (tx, rx) = flume::bounded(1);
3377        let client = Client::from_sender(tx);
3378
3379        client
3380            .publish_with_properties(
3381                "",
3382                QoS::AtMostOnce,
3383                false,
3384                "good bye",
3385                PublishProperties {
3386                    topic_alias: Some(1),
3387                    ..Default::default()
3388                },
3389            )
3390            .expect("Empty topic with non-zero topic alias should be accepted");
3391
3392        let request = rx.try_recv().expect("Should have message");
3393        match request {
3394            Request::Publish(publish) => {
3395                assert!(publish.topic.is_empty());
3396                assert_eq!(
3397                    publish
3398                        .properties
3399                        .as_ref()
3400                        .and_then(|properties| properties.topic_alias),
3401                    Some(1)
3402                );
3403            }
3404            request => panic!("Expected Publish request, got {request:?}"),
3405        }
3406    }
3407
3408    #[test]
3409    fn try_publish_accepts_validated_topic() {
3410        let (tx, rx) = flume::bounded(1);
3411        let client = Client::from_sender(tx);
3412        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3413        client
3414            .try_publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
3415            .expect("Should be able to publish");
3416        let _ = rx.try_recv().expect("Should have message");
3417    }
3418
3419    #[test]
3420    fn try_publish_with_properties_accepts_validated_topic() {
3421        let (tx, rx) = flume::bounded(1);
3422        let client = Client::from_sender(tx);
3423        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
3424        client
3425            .try_publish_with_properties(
3426                valid_topic,
3427                QoS::ExactlyOnce,
3428                false,
3429                "good bye",
3430                PublishProperties::default(),
3431            )
3432            .expect("Should be able to publish");
3433        let _ = rx.try_recv().expect("Should have message");
3434    }
3435
3436    #[test]
3437    fn try_publish_with_properties_empty_topic_requires_nonzero_alias() {
3438        let (tx, _) = flume::bounded(1);
3439        let client = Client::from_sender(tx);
3440
3441        let err = client
3442            .try_publish_with_properties(
3443                "",
3444                QoS::AtMostOnce,
3445                false,
3446                "good bye",
3447                PublishProperties::default(),
3448            )
3449            .expect_err("Empty topic without topic alias should fail");
3450        assert!(matches!(err, ClientError::TryRequest(req) if matches!(*req, Request::Publish(_))));
3451    }
3452
3453    #[test]
3454    fn publishing_invalid_raw_topic_fails() {
3455        let (tx, _) = flume::bounded(1);
3456        let client = Client::from_sender(tx);
3457        let err = client
3458            .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
3459            .expect_err("Invalid publish topic should fail");
3460        assert!(matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_))));
3461    }
3462
3463    #[test]
3464    fn async_publish_paths_accept_validated_topic() {
3465        let (tx, rx) = flume::bounded(4);
3466        let client = AsyncClient::from_senders(tx);
3467        let runtime = runtime::Builder::new_current_thread()
3468            .enable_all()
3469            .build()
3470            .unwrap();
3471
3472        runtime.block_on(async {
3473            client
3474                .publish(
3475                    ValidatedTopic::new("hello/world").unwrap(),
3476                    QoS::ExactlyOnce,
3477                    false,
3478                    "good bye",
3479                )
3480                .await
3481                .expect("Should be able to publish");
3482
3483            client
3484                .publish_with_properties(
3485                    ValidatedTopic::new("hello/world").unwrap(),
3486                    QoS::ExactlyOnce,
3487                    false,
3488                    "good bye",
3489                    PublishProperties::default(),
3490                )
3491                .await
3492                .expect("Should be able to publish");
3493
3494            client
3495                .publish_bytes(
3496                    ValidatedTopic::new("hello/world").unwrap(),
3497                    QoS::ExactlyOnce,
3498                    false,
3499                    Bytes::from_static(b"good bye"),
3500                )
3501                .await
3502                .expect("Should be able to publish");
3503
3504            client
3505                .publish_bytes_with_properties(
3506                    ValidatedTopic::new("hello/world").unwrap(),
3507                    QoS::ExactlyOnce,
3508                    false,
3509                    Bytes::from_static(b"good bye"),
3510                    PublishProperties::default(),
3511                )
3512                .await
3513                .expect("Should be able to publish");
3514        });
3515
3516        let _ = rx.try_recv().expect("Should have message");
3517        let _ = rx.try_recv().expect("Should have message");
3518        let _ = rx.try_recv().expect("Should have message");
3519        let _ = rx.try_recv().expect("Should have message");
3520    }
3521
3522    #[test]
3523    fn async_try_publish_paths_accept_validated_topic() {
3524        let (tx, rx) = flume::bounded(4);
3525        let client = AsyncClient::from_senders(tx);
3526
3527        client
3528            .try_publish(
3529                ValidatedTopic::new("hello/world").unwrap(),
3530                QoS::ExactlyOnce,
3531                false,
3532                "good bye",
3533            )
3534            .expect("Should be able to publish");
3535
3536        client
3537            .try_publish_with_properties(
3538                ValidatedTopic::new("hello/world").unwrap(),
3539                QoS::ExactlyOnce,
3540                false,
3541                "good bye",
3542                PublishProperties::default(),
3543            )
3544            .expect("Should be able to publish");
3545
3546        let _ = rx.try_recv().expect("Should have message");
3547        let _ = rx.try_recv().expect("Should have message");
3548    }
3549
3550    #[test]
3551    fn async_publishing_invalid_raw_topic_fails() {
3552        let (tx, _) = flume::bounded(1);
3553        let client = AsyncClient::from_senders(tx);
3554        let runtime = runtime::Builder::new_current_thread()
3555            .enable_all()
3556            .build()
3557            .unwrap();
3558
3559        runtime.block_on(async {
3560            let err = client
3561                .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
3562                .await
3563                .expect_err("Invalid publish topic should fail");
3564            assert!(
3565                matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
3566            );
3567
3568            let err = client
3569                .publish_bytes(
3570                    "a/+/b",
3571                    QoS::ExactlyOnce,
3572                    false,
3573                    Bytes::from_static(b"good bye"),
3574                )
3575                .await
3576                .expect_err("Invalid publish topic should fail");
3577            assert!(
3578                matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
3579            );
3580
3581            let err = client
3582                .publish_with_properties(
3583                    "",
3584                    QoS::AtMostOnce,
3585                    false,
3586                    "good bye",
3587                    PublishProperties::default(),
3588                )
3589                .await
3590                .expect_err("Empty topic without topic alias should fail");
3591            assert!(
3592                matches!(err, ClientError::Request(req) if matches!(*req, Request::Publish(_)))
3593            );
3594        });
3595    }
3596
3597    #[test]
3598    fn disconnect_with_properties_builds_disconnect_request() {
3599        let (tx, rx) = flume::bounded(1);
3600        let client = Client::from_sender(tx);
3601        let properties = DisconnectProperties {
3602            session_expiry_interval: Some(120),
3603            reason_string: Some("closing".to_string()),
3604            user_properties: vec![("source".to_string(), "test".to_string())],
3605            server_reference: Some("backup-broker".to_string()),
3606        };
3607
3608        client
3609            .disconnect_with_properties(
3610                DisconnectReasonCode::ImplementationSpecificError,
3611                properties.clone(),
3612            )
3613            .expect("disconnect_with_properties should enqueue request");
3614
3615        let request = rx.try_recv().expect("Should have disconnect request");
3616        match request {
3617            Request::Disconnect(disconnect) => {
3618                assert_eq!(
3619                    disconnect.reason_code,
3620                    DisconnectReasonCode::ImplementationSpecificError
3621                );
3622                assert_eq!(disconnect.properties, Some(properties));
3623            }
3624            request => panic!("Expected disconnect request, got {request:?}"),
3625        }
3626    }
3627
3628    #[test]
3629    fn try_disconnect_with_properties_builds_disconnect_request() {
3630        let (tx, rx) = flume::bounded(1);
3631        let client = Client::from_sender(tx);
3632        let properties = DisconnectProperties {
3633            session_expiry_interval: Some(360),
3634            reason_string: Some("maintenance".to_string()),
3635            user_properties: vec![("env".to_string(), "test".to_string())],
3636            server_reference: None,
3637        };
3638
3639        client
3640            .try_disconnect_with_properties(
3641                DisconnectReasonCode::ServerShuttingDown,
3642                properties.clone(),
3643            )
3644            .expect("try_disconnect_with_properties should enqueue request");
3645
3646        let request = rx.try_recv().expect("Should have disconnect request");
3647        match request {
3648            Request::Disconnect(disconnect) => {
3649                assert_eq!(
3650                    disconnect.reason_code,
3651                    DisconnectReasonCode::ServerShuttingDown
3652                );
3653                assert_eq!(disconnect.properties, Some(properties));
3654            }
3655            request => panic!("Expected disconnect request, got {request:?}"),
3656        }
3657    }
3658
3659    #[test]
3660    fn async_disconnect_with_properties_builds_disconnect_request() {
3661        let (tx, rx) = flume::bounded(1);
3662        let client = AsyncClient::from_senders(tx);
3663        let runtime = runtime::Builder::new_current_thread()
3664            .enable_all()
3665            .build()
3666            .unwrap();
3667        let properties = DisconnectProperties {
3668            session_expiry_interval: Some(42),
3669            reason_string: Some("done".to_string()),
3670            user_properties: vec![("k".to_string(), "v".to_string())],
3671            server_reference: Some("fallback".to_string()),
3672        };
3673
3674        runtime.block_on(async {
3675            client
3676                .disconnect_with_properties(
3677                    DisconnectReasonCode::UseAnotherServer,
3678                    properties.clone(),
3679                )
3680                .await
3681                .expect("disconnect_with_properties should enqueue request");
3682        });
3683
3684        let request = rx.try_recv().expect("Should have disconnect request");
3685        match request {
3686            Request::Disconnect(disconnect) => {
3687                assert_eq!(
3688                    disconnect.reason_code,
3689                    DisconnectReasonCode::UseAnotherServer
3690                );
3691                assert_eq!(disconnect.properties, Some(properties));
3692            }
3693            request => panic!("Expected disconnect request, got {request:?}"),
3694        }
3695    }
3696
3697    #[test]
3698    fn async_try_disconnect_with_properties_builds_disconnect_request() {
3699        let (tx, rx) = flume::bounded(1);
3700        let client = AsyncClient::from_senders(tx);
3701        let properties = DisconnectProperties {
3702            session_expiry_interval: Some(7),
3703            reason_string: Some("bye".to_string()),
3704            user_properties: vec![("actor".to_string(), "test".to_string())],
3705            server_reference: None,
3706        };
3707
3708        client
3709            .try_disconnect_with_properties(
3710                DisconnectReasonCode::AdministrativeAction,
3711                properties.clone(),
3712            )
3713            .expect("try_disconnect_with_properties should enqueue request");
3714
3715        let request = rx.try_recv().expect("Should have disconnect request");
3716        match request {
3717            Request::Disconnect(disconnect) => {
3718                assert_eq!(
3719                    disconnect.reason_code,
3720                    DisconnectReasonCode::AdministrativeAction
3721                );
3722                assert_eq!(disconnect.properties, Some(properties));
3723            }
3724            request => panic!("Expected disconnect request, got {request:?}"),
3725        }
3726    }
3727
3728    #[test]
3729    fn tracked_publish_requires_tracking_channel() {
3730        let (tx, _) = flume::bounded(2);
3731        let client = AsyncClient::from_senders(tx);
3732        let runtime = runtime::Builder::new_current_thread()
3733            .enable_all()
3734            .build()
3735            .unwrap();
3736
3737        runtime.block_on(async {
3738            let err = client
3739                .publish_tracked("hello/world", QoS::AtLeastOnce, false, "good bye")
3740                .await
3741                .expect_err("tracked publish should fail without tracked channel");
3742            assert!(matches!(err, ClientError::TrackingUnavailable));
3743
3744            let err = client
3745                .publish_bytes_tracked(
3746                    "hello/world",
3747                    QoS::AtLeastOnce,
3748                    false,
3749                    Bytes::from_static(b"good bye"),
3750                )
3751                .await
3752                .expect_err("tracked publish bytes should fail without tracked channel");
3753            assert!(matches!(err, ClientError::TrackingUnavailable));
3754
3755            let err = client
3756                .subscribe_tracked("hello/world", QoS::AtLeastOnce)
3757                .await
3758                .expect_err("tracked subscribe should fail without tracked channel");
3759            assert!(matches!(err, ClientError::TrackingUnavailable));
3760
3761            let err = client
3762                .subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
3763                .await
3764                .expect_err("tracked subscribe many should fail without tracked channel");
3765            assert!(matches!(err, ClientError::TrackingUnavailable));
3766
3767            let err = client
3768                .unsubscribe_tracked("hello/world")
3769                .await
3770                .expect_err("tracked unsubscribe should fail without tracked channel");
3771            assert!(matches!(err, ClientError::TrackingUnavailable));
3772        });
3773
3774        let err = client
3775            .try_subscribe_tracked("hello/world", QoS::AtLeastOnce)
3776            .expect_err("tracked try_subscribe should fail without tracked channel");
3777        assert!(matches!(err, ClientError::TrackingUnavailable));
3778
3779        let err = client
3780            .try_subscribe_many_tracked(vec![Filter::new("hello/world", QoS::AtLeastOnce)])
3781            .expect_err("tracked try_subscribe_many should fail without tracked channel");
3782        assert!(matches!(err, ClientError::TrackingUnavailable));
3783
3784        let err = client
3785            .try_unsubscribe_tracked("hello/world")
3786            .expect_err("tracked try_unsubscribe should fail without tracked channel");
3787        assert!(matches!(err, ClientError::TrackingUnavailable));
3788    }
3789
3790    #[test]
3791    fn tracked_unsubscribe_uses_control_request_channel() {
3792        let (requests, requests_rx) = flume::bounded(1);
3793        let (control_requests, control_requests_rx) = flume::bounded(1);
3794        let (immediate_disconnect, _immediate_disconnect_rx) = flume::unbounded();
3795        let client = AsyncClient {
3796            request_tx: RequestSender::WithNotice {
3797                requests,
3798                control_requests,
3799                immediate_disconnect,
3800            },
3801        };
3802        let runtime = runtime::Builder::new_current_thread()
3803            .enable_all()
3804            .build()
3805            .unwrap();
3806
3807        runtime
3808            .block_on(client.unsubscribe_tracked("hello/world"))
3809            .expect("tracked unsubscribe should enqueue");
3810
3811        assert!(requests_rx.is_empty());
3812        let envelope = control_requests_rx
3813            .try_recv()
3814            .expect("tracked unsubscribe should use control channel");
3815        assert!(matches!(envelope.into_parts().0, Request::Unsubscribe(_)));
3816    }
3817
3818    #[test]
3819    fn tracked_auth_uses_control_request_channel() {
3820        let (requests, requests_rx) = flume::bounded(1);
3821        let (control_requests, control_requests_rx) = flume::bounded(1);
3822        let (immediate_disconnect, _immediate_disconnect_rx) = flume::unbounded();
3823        let client = AsyncClient {
3824            request_tx: RequestSender::WithNotice {
3825                requests,
3826                control_requests,
3827                immediate_disconnect,
3828            },
3829        };
3830        let runtime = runtime::Builder::new_current_thread()
3831            .enable_all()
3832            .build()
3833            .unwrap();
3834
3835        runtime
3836            .block_on(client.reauth_tracked(None))
3837            .expect("tracked auth should enqueue");
3838
3839        assert!(requests_rx.is_empty());
3840        let envelope = control_requests_rx
3841            .try_recv()
3842            .expect("tracked auth should use control channel");
3843        assert!(matches!(envelope.into_parts().0, Request::Auth(_)));
3844    }
3845}