Skip to main content

rumqttc/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use std::borrow::Cow;
4use std::time::Duration;
5
6use crate::eventloop::{RequestChannelCapacity, RequestEnvelope};
7use crate::mqttbytes::{
8    QoS,
9    v4::{Disconnect, PubAck, PubRec, Publish, Subscribe, SubscribeFilter, Unsubscribe},
10};
11use crate::notice::{PublishNoticeTx, SubscribeNoticeTx, UnsubscribeNoticeTx};
12use crate::{
13    ConnectionError, Event, EventLoop, MqttOptions, PublishNotice, Request, SubscribeNotice,
14    UnsubscribeNotice, valid_filter, valid_topic,
15};
16
17use bytes::Bytes;
18use flume::{SendError, Sender, TrySendError};
19use futures_util::FutureExt;
20use tokio::runtime::{self, Runtime};
21use tokio::time::timeout;
22
23/// An error returned when a topic string fails validation against the MQTT specification.
24#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
25#[error("Invalid MQTT topic: '{0}'")]
26pub struct InvalidTopic(String);
27
28/// A newtype wrapper that guarantees its inner `String` is a valid MQTT topic.
29///
30/// This type prevents the cost of repeated validation for topics that are used
31/// frequently. It can only be constructed via [`ValidatedTopic::new`], which
32/// performs a one-time validation check.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct ValidatedTopic(String);
35
36impl ValidatedTopic {
37    /// Constructs a new `ValidatedTopic` after validating the input string.
38    ///
39    /// # Errors
40    ///
41    /// Returns [`InvalidTopic`] if the topic string does not conform to the MQTT specification.
42    pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
43        let topic_string = topic.into();
44        if valid_publish_topic(&topic_string) {
45            Ok(Self(topic_string))
46        } else {
47            Err(InvalidTopic(topic_string))
48        }
49    }
50}
51
52impl From<ValidatedTopic> for String {
53    fn from(topic: ValidatedTopic) -> Self {
54        topic.0
55    }
56}
57
58/// Topic argument accepted by publish APIs.
59///
60/// `ValidatedTopic` variants skip per-call validation while string variants are
61/// validated for MQTT topic correctness.
62pub enum PublishTopic {
63    /// Raw topic input that must be validated before publishing.
64    Unvalidated(String),
65    /// Topic that has already been validated once.
66    Validated(ValidatedTopic),
67}
68
69impl PublishTopic {
70    fn into_string_and_validation(self) -> (String, bool) {
71        match self {
72            Self::Unvalidated(topic) => (topic, true),
73            Self::Validated(topic) => (topic.0, false),
74        }
75    }
76}
77
78impl From<ValidatedTopic> for PublishTopic {
79    fn from(topic: ValidatedTopic) -> Self {
80        Self::Validated(topic)
81    }
82}
83
84impl From<String> for PublishTopic {
85    fn from(topic: String) -> Self {
86        Self::Unvalidated(topic)
87    }
88}
89
90impl From<&str> for PublishTopic {
91    fn from(topic: &str) -> Self {
92        Self::Unvalidated(topic.to_owned())
93    }
94}
95
96impl From<&String> for PublishTopic {
97    fn from(topic: &String) -> Self {
98        Self::Unvalidated(topic.clone())
99    }
100}
101
102impl From<Cow<'_, str>> for PublishTopic {
103    fn from(topic: Cow<'_, str>) -> Self {
104        Self::Unvalidated(topic.into_owned())
105    }
106}
107
108/// Client Error
109#[derive(Debug, Eq, PartialEq, thiserror::Error)]
110pub enum ClientError {
111    #[error("Failed to send mqtt requests to eventloop")]
112    Request(Request),
113    #[error("Failed to send mqtt requests to eventloop")]
114    TryRequest(Request),
115    #[error("Tracked request API is unavailable for this client instance")]
116    TrackingUnavailable,
117}
118
119impl From<SendError<Request>> for ClientError {
120    fn from(e: SendError<Request>) -> Self {
121        Self::Request(e.into_inner())
122    }
123}
124
125impl From<TrySendError<Request>> for ClientError {
126    fn from(e: TrySendError<Request>) -> Self {
127        Self::TryRequest(e.into_inner())
128    }
129}
130
131#[derive(Clone, Debug)]
132enum RequestSender {
133    Plain(Sender<Request>),
134    WithNotice {
135        requests: Sender<RequestEnvelope>,
136        control_requests: Sender<RequestEnvelope>,
137        immediate_disconnect: Sender<RequestEnvelope>,
138    },
139}
140
141fn into_request(envelope: RequestEnvelope) -> Request {
142    let (request, _notice) = envelope.into_parts();
143    request
144}
145
146fn map_send_envelope_error(err: SendError<RequestEnvelope>) -> ClientError {
147    ClientError::Request(into_request(err.into_inner()))
148}
149
150fn map_try_send_envelope_error(err: TrySendError<RequestEnvelope>) -> ClientError {
151    match err {
152        TrySendError::Full(envelope) | TrySendError::Disconnected(envelope) => {
153            ClientError::TryRequest(into_request(envelope))
154        }
155    }
156}
157
158const fn is_publish_request(request: &Request) -> bool {
159    matches!(request, Request::Publish(_))
160}
161
162/// Prepared acknowledgement packet for manual acknowledgement mode.
163#[derive(Clone, Debug, PartialEq, Eq)]
164pub enum ManualAck {
165    PubAck(PubAck),
166    PubRec(PubRec),
167}
168
169impl ManualAck {
170    const fn into_request(self) -> Request {
171        match self {
172            Self::PubAck(ack) => Request::PubAck(ack),
173            Self::PubRec(rec) => Request::PubRec(rec),
174        }
175    }
176}
177
178/// An asynchronous client, communicates with MQTT `EventLoop`.
179///
180/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
181/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
182///
183/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
184/// from the broker, i.e. move ahead.
185///
186/// Bounded clients apply backpressure through the client request channel. If the
187/// same task that drives [`EventLoop::poll`](crate::EventLoop::poll) awaits
188/// request-sending APIs such as [`publish`](Self::publish),
189/// [`subscribe`](Self::subscribe), [`unsubscribe`](Self::unsubscribe), or
190/// [`ack`](Self::ack) while that channel is full, it can self-block: the send is
191/// waiting for the event loop to read a request, but the event loop cannot make
192/// progress until that same task polls it again. For bounded async clients,
193/// prefer driving the event loop in a dedicated task. Use [`try_publish`](Self::try_publish)
194/// when dropping outgoing publishes under overload is intended.
195///
196/// The request channel is an admission queue, not a strict global wire FIFO
197/// guarantee. Under publish flow-control pressure, non-`PUBLISH` control
198/// packets can pass earlier `QoS` 1/ `QoS` 2 publishes that are not currently
199/// sendable. Application publishes preserve FIFO with other publishes.
200#[derive(Clone, Debug)]
201pub struct AsyncClient {
202    request_tx: RequestSender,
203}
204
205/// Builder for synchronous MQTT clients.
206///
207/// The request channel is bounded by default using
208/// [`MqttOptions::request_channel_capacity`]. Use [`Self::capacity`] to override
209/// the bounded capacity, or [`Self::unbounded`] to opt into an unbounded request channel.
210#[derive(Debug)]
211pub struct ClientBuilder {
212    options: MqttOptions,
213    capacity: RequestChannelCapacity,
214}
215
216/// Builder for asynchronous MQTT clients.
217///
218/// The request channel is bounded by default using
219/// [`MqttOptions::request_channel_capacity`]. Use [`Self::capacity`] to override
220/// the bounded capacity, or [`Self::unbounded`] to opt into an unbounded request channel.
221#[derive(Debug)]
222pub struct AsyncClientBuilder {
223    options: MqttOptions,
224    capacity: RequestChannelCapacity,
225}
226
227#[must_use]
228fn build_async_client(
229    options: MqttOptions,
230    capacity: RequestChannelCapacity,
231) -> (AsyncClient, EventLoop) {
232    let (eventloop, request_tx, control_request_tx, immediate_disconnect_tx) =
233        EventLoop::new_for_async_client_with_capacity(options, capacity);
234    let client = AsyncClient {
235        request_tx: RequestSender::WithNotice {
236            requests: request_tx,
237            control_requests: control_request_tx,
238            immediate_disconnect: immediate_disconnect_tx,
239        },
240    };
241
242    (client, eventloop)
243}
244
245impl ClientBuilder {
246    /// Create a new builder for a synchronous [`Client`].
247    #[must_use]
248    pub const fn new(options: MqttOptions) -> Self {
249        let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
250        Self { options, capacity }
251    }
252
253    /// Use a bounded request channel with the given capacity.
254    ///
255    /// `0` creates a bounded zero-capacity rendezvous channel. Use [`Self::unbounded`]
256    /// for an unbounded request channel.
257    #[must_use]
258    pub const fn capacity(mut self, cap: usize) -> Self {
259        self.capacity = RequestChannelCapacity::Bounded(cap);
260        self
261    }
262
263    /// Use an unbounded request channel.
264    #[must_use]
265    pub const fn unbounded(mut self) -> Self {
266        self.capacity = RequestChannelCapacity::Unbounded;
267        self
268    }
269
270    /// Build a synchronous client and connection.
271    ///
272    /// This builder always produces the synchronous client pair so the
273    /// terminal `build()` method matches the entry point that created it.
274    ///
275    /// # Panics
276    ///
277    /// Panics if the current-thread Tokio runtime cannot be created.
278    #[must_use]
279    pub fn build(self) -> (Client, Connection) {
280        let (client, eventloop) = build_async_client(self.options, self.capacity);
281        let client = Client { client };
282        let runtime = runtime::Builder::new_current_thread()
283            .enable_all()
284            .build()
285            .unwrap();
286
287        let connection = Connection::new(eventloop, runtime);
288        (client, connection)
289    }
290}
291
292impl AsyncClientBuilder {
293    /// Create a new builder for an asynchronous [`AsyncClient`].
294    #[must_use]
295    pub const fn new(options: MqttOptions) -> Self {
296        let capacity = RequestChannelCapacity::Bounded(options.request_channel_capacity());
297        Self { options, capacity }
298    }
299
300    /// Use a bounded request channel with the given capacity.
301    ///
302    /// `0` creates a bounded zero-capacity rendezvous channel. Use [`Self::unbounded`]
303    /// for an unbounded request channel.
304    #[must_use]
305    pub const fn capacity(mut self, cap: usize) -> Self {
306        self.capacity = RequestChannelCapacity::Bounded(cap);
307        self
308    }
309
310    /// Use an unbounded request channel.
311    #[must_use]
312    pub const fn unbounded(mut self) -> Self {
313        self.capacity = RequestChannelCapacity::Unbounded;
314        self
315    }
316
317    /// Build an asynchronous client and event loop.
318    ///
319    /// This builder always produces the asynchronous client pair so the
320    /// terminal `build()` method matches the entry point that created it.
321    #[must_use]
322    pub fn build(self) -> (AsyncClient, EventLoop) {
323        build_async_client(self.options, self.capacity)
324    }
325}
326
327impl AsyncClient {
328    /// Create a builder for an [`AsyncClient`].
329    ///
330    /// The returned [`AsyncClientBuilder`] only builds the asynchronous
331    /// client pair, which keeps the terminal `build()` method aligned with
332    /// this entry point.
333    #[must_use]
334    pub const fn builder(options: MqttOptions) -> AsyncClientBuilder {
335        AsyncClientBuilder::new(options)
336    }
337
338    /// Create a new `AsyncClient` from a channel `Sender`.
339    ///
340    /// This is mostly useful for creating a test instance where you can
341    /// listen on the corresponding receiver.
342    #[must_use]
343    pub const fn from_senders(request_tx: Sender<Request>) -> Self {
344        Self {
345            request_tx: RequestSender::Plain(request_tx),
346        }
347    }
348
349    async fn send_request_async(&self, request: Request) -> Result<(), ClientError> {
350        match &self.request_tx {
351            RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
352            RequestSender::WithNotice {
353                requests,
354                control_requests,
355                ..
356            } => {
357                let tx = if is_publish_request(&request) {
358                    requests
359                } else {
360                    control_requests
361                };
362                tx.send_async(RequestEnvelope::plain(request))
363                    .await
364                    .map_err(map_send_envelope_error)
365            }
366        }
367    }
368
369    fn try_send_request(&self, request: Request) -> Result<(), ClientError> {
370        match &self.request_tx {
371            RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
372            RequestSender::WithNotice {
373                requests,
374                control_requests,
375                ..
376            } => {
377                let tx = if is_publish_request(&request) {
378                    requests
379                } else {
380                    control_requests
381                };
382                tx.try_send(RequestEnvelope::plain(request))
383                    .map_err(map_try_send_envelope_error)
384            }
385        }
386    }
387
388    fn send_request(&self, request: Request) -> Result<(), ClientError> {
389        match &self.request_tx {
390            RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
391            RequestSender::WithNotice {
392                requests,
393                control_requests,
394                ..
395            } => {
396                let tx = if is_publish_request(&request) {
397                    requests
398                } else {
399                    control_requests
400                };
401                tx.send(RequestEnvelope::plain(request))
402                    .map_err(map_send_envelope_error)
403            }
404        }
405    }
406
407    async fn send_immediate_disconnect_async(&self, request: Request) -> Result<(), ClientError> {
408        match &self.request_tx {
409            RequestSender::Plain(tx) => tx.send_async(request).await.map_err(ClientError::from),
410            RequestSender::WithNotice {
411                immediate_disconnect,
412                ..
413            } => immediate_disconnect
414                .send_async(RequestEnvelope::plain(request))
415                .await
416                .map_err(map_send_envelope_error),
417        }
418    }
419
420    fn try_send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
421        match &self.request_tx {
422            RequestSender::Plain(tx) => tx.try_send(request).map_err(ClientError::from),
423            RequestSender::WithNotice {
424                immediate_disconnect,
425                ..
426            } => immediate_disconnect
427                .try_send(RequestEnvelope::plain(request))
428                .map_err(map_try_send_envelope_error),
429        }
430    }
431
432    fn send_immediate_disconnect(&self, request: Request) -> Result<(), ClientError> {
433        match &self.request_tx {
434            RequestSender::Plain(tx) => tx.send(request).map_err(ClientError::from),
435            RequestSender::WithNotice {
436                immediate_disconnect,
437                ..
438            } => immediate_disconnect
439                .send(RequestEnvelope::plain(request))
440                .map_err(map_send_envelope_error),
441        }
442    }
443
444    async fn send_tracked_publish_async(
445        &self,
446        publish: Publish,
447    ) -> Result<PublishNotice, ClientError> {
448        let RequestSender::WithNotice {
449            requests: request_tx,
450            ..
451        } = &self.request_tx
452        else {
453            return Err(ClientError::TrackingUnavailable);
454        };
455
456        let (notice_tx, notice) = PublishNoticeTx::new();
457        request_tx
458            .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
459            .await
460            .map_err(map_send_envelope_error)?;
461        Ok(notice)
462    }
463
464    fn try_send_tracked_publish(&self, publish: Publish) -> Result<PublishNotice, ClientError> {
465        let RequestSender::WithNotice {
466            requests: request_tx,
467            ..
468        } = &self.request_tx
469        else {
470            return Err(ClientError::TrackingUnavailable);
471        };
472
473        let (notice_tx, notice) = PublishNoticeTx::new();
474        request_tx
475            .try_send(RequestEnvelope::tracked_publish(publish, notice_tx))
476            .map_err(map_try_send_envelope_error)?;
477        Ok(notice)
478    }
479
480    async fn send_tracked_subscribe_async(
481        &self,
482        subscribe: Subscribe,
483    ) -> Result<SubscribeNotice, ClientError> {
484        let RequestSender::WithNotice {
485            control_requests: request_tx,
486            ..
487        } = &self.request_tx
488        else {
489            return Err(ClientError::TrackingUnavailable);
490        };
491
492        let (notice_tx, notice) = SubscribeNoticeTx::new();
493        request_tx
494            .send_async(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
495            .await
496            .map_err(map_send_envelope_error)?;
497        Ok(notice)
498    }
499
500    fn try_send_tracked_subscribe(
501        &self,
502        subscribe: Subscribe,
503    ) -> Result<SubscribeNotice, ClientError> {
504        let RequestSender::WithNotice {
505            control_requests: request_tx,
506            ..
507        } = &self.request_tx
508        else {
509            return Err(ClientError::TrackingUnavailable);
510        };
511
512        let (notice_tx, notice) = SubscribeNoticeTx::new();
513        request_tx
514            .try_send(RequestEnvelope::tracked_subscribe(subscribe, notice_tx))
515            .map_err(map_try_send_envelope_error)?;
516        Ok(notice)
517    }
518
519    async fn send_tracked_unsubscribe_async(
520        &self,
521        unsubscribe: Unsubscribe,
522    ) -> Result<UnsubscribeNotice, ClientError> {
523        let RequestSender::WithNotice {
524            control_requests: request_tx,
525            ..
526        } = &self.request_tx
527        else {
528            return Err(ClientError::TrackingUnavailable);
529        };
530
531        let (notice_tx, notice) = UnsubscribeNoticeTx::new();
532        request_tx
533            .send_async(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
534            .await
535            .map_err(map_send_envelope_error)?;
536        Ok(notice)
537    }
538
539    fn try_send_tracked_unsubscribe(
540        &self,
541        unsubscribe: Unsubscribe,
542    ) -> Result<UnsubscribeNotice, ClientError> {
543        let RequestSender::WithNotice {
544            control_requests: request_tx,
545            ..
546        } = &self.request_tx
547        else {
548            return Err(ClientError::TrackingUnavailable);
549        };
550
551        let (notice_tx, notice) = UnsubscribeNoticeTx::new();
552        request_tx
553            .try_send(RequestEnvelope::tracked_unsubscribe(unsubscribe, notice_tx))
554            .map_err(map_try_send_envelope_error)?;
555        Ok(notice)
556    }
557
558    /// Sends a MQTT Publish to the `EventLoop`.
559    async fn handle_publish<T, V>(
560        &self,
561        topic: T,
562        qos: QoS,
563        retain: bool,
564        payload: V,
565    ) -> Result<(), ClientError>
566    where
567        T: Into<PublishTopic>,
568        V: Into<Vec<u8>>,
569    {
570        let (topic, needs_validation) = topic.into().into_string_and_validation();
571        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
572        let mut publish = Publish::new(topic, qos, payload);
573        publish.retain = retain;
574        let publish = Request::Publish(publish);
575
576        if invalid_topic {
577            return Err(ClientError::Request(publish));
578        }
579
580        self.send_request_async(publish).await?;
581        Ok(())
582    }
583
584    async fn handle_publish_tracked<T, V>(
585        &self,
586        topic: T,
587        qos: QoS,
588        retain: bool,
589        payload: V,
590    ) -> Result<PublishNotice, ClientError>
591    where
592        T: Into<PublishTopic>,
593        V: Into<Vec<u8>>,
594    {
595        let (topic, needs_validation) = topic.into().into_string_and_validation();
596        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
597        let mut publish = Publish::new(topic, qos, payload);
598        publish.retain = retain;
599        let request = Request::Publish(publish.clone());
600
601        if invalid_topic {
602            return Err(ClientError::Request(request));
603        }
604
605        self.send_tracked_publish_async(publish).await
606    }
607
608    /// Sends a MQTT Publish to the `EventLoop`.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the topic is invalid or if the request cannot be
613    /// queued on the event loop.
614    pub async fn publish<T, V>(
615        &self,
616        topic: T,
617        qos: QoS,
618        retain: bool,
619        payload: V,
620    ) -> Result<(), ClientError>
621    where
622        T: Into<PublishTopic>,
623        V: Into<Vec<u8>>,
624    {
625        self.handle_publish(topic, qos, retain, payload).await
626    }
627
628    /// Sends a MQTT Publish to the `EventLoop` and returns a notice which resolves on MQTT ack milestone.
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if the topic is invalid or if the request cannot be
633    /// queued on the event loop.
634    pub async fn publish_tracked<T, V>(
635        &self,
636        topic: T,
637        qos: QoS,
638        retain: bool,
639        payload: V,
640    ) -> Result<PublishNotice, ClientError>
641    where
642        T: Into<PublishTopic>,
643        V: Into<Vec<u8>>,
644    {
645        self.handle_publish_tracked(topic, qos, retain, payload)
646            .await
647    }
648
649    /// Attempts to send a MQTT Publish to the `EventLoop`.
650    fn handle_try_publish<T, V>(
651        &self,
652        topic: T,
653        qos: QoS,
654        retain: bool,
655        payload: V,
656    ) -> Result<(), ClientError>
657    where
658        T: Into<PublishTopic>,
659        V: Into<Vec<u8>>,
660    {
661        let (topic, needs_validation) = topic.into().into_string_and_validation();
662        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
663        let mut publish = Publish::new(topic, qos, payload);
664        publish.retain = retain;
665        let publish = Request::Publish(publish);
666
667        if invalid_topic {
668            return Err(ClientError::TryRequest(publish));
669        }
670
671        self.try_send_request(publish)?;
672        Ok(())
673    }
674
675    fn handle_try_publish_tracked<T, V>(
676        &self,
677        topic: T,
678        qos: QoS,
679        retain: bool,
680        payload: V,
681    ) -> Result<PublishNotice, ClientError>
682    where
683        T: Into<PublishTopic>,
684        V: Into<Vec<u8>>,
685    {
686        let (topic, needs_validation) = topic.into().into_string_and_validation();
687        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
688        let mut publish = Publish::new(topic, qos, payload);
689        publish.retain = retain;
690        let request = Request::Publish(publish.clone());
691
692        if invalid_topic {
693            return Err(ClientError::TryRequest(request));
694        }
695
696        self.try_send_tracked_publish(publish)
697    }
698
699    /// Attempts to send a MQTT Publish to the `EventLoop`.
700    ///
701    /// This is the non-blocking publish API for overload policies that may drop
702    /// outgoing publishes. If the bounded request channel is full, this returns
703    /// an error immediately and the publish has not been queued.
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the topic is invalid or if the request cannot be
708    /// queued immediately on the event loop.
709    pub fn try_publish<T, V>(
710        &self,
711        topic: T,
712        qos: QoS,
713        retain: bool,
714        payload: V,
715    ) -> Result<(), ClientError>
716    where
717        T: Into<PublishTopic>,
718        V: Into<Vec<u8>>,
719    {
720        self.handle_try_publish(topic, qos, retain, payload)
721    }
722
723    /// Attempts to send a MQTT Publish to the `EventLoop` and returns a notice.
724    ///
725    /// This is the non-blocking tracked publish API for overload policies that
726    /// may drop outgoing publishes. If the bounded request channel is full, this
727    /// returns an error immediately and the publish has not been queued.
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if the topic is invalid or if the request cannot be
732    /// queued immediately on the event loop.
733    pub fn try_publish_tracked<T, V>(
734        &self,
735        topic: T,
736        qos: QoS,
737        retain: bool,
738        payload: V,
739    ) -> Result<PublishNotice, ClientError>
740    where
741        T: Into<PublishTopic>,
742        V: Into<Vec<u8>>,
743    {
744        self.handle_try_publish_tracked(topic, qos, retain, payload)
745    }
746
747    /// Prepares a MQTT PubAck/PubRec packet for manual acknowledgement.
748    ///
749    /// Returns `None` for `QoS0` publishes, which do not require acknowledgement.
750    pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
751        prepare_ack(publish)
752    }
753
754    /// Sends a prepared MQTT PubAck/PubRec to the `EventLoop`.
755    ///
756    /// This is useful when `manual_acks` is enabled and acknowledgement must be deferred.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the acknowledgement cannot be queued on the event
761    /// loop.
762    pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
763        self.send_request_async(ack.into_request()).await?;
764        Ok(())
765    }
766
767    /// Attempts to send a prepared MQTT PubAck/PubRec to the `EventLoop`.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the acknowledgement cannot be queued immediately on
772    /// the event loop.
773    pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
774        self.try_send_request(ack.into_request())?;
775        Ok(())
776    }
777
778    /// Sends a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
779    /// Only needed if the `manual_acks` flag is set.
780    ///
781    /// # Errors
782    ///
783    /// Returns an error if the derived acknowledgement cannot be queued on the
784    /// event loop.
785    pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
786        if let Some(ack) = self.prepare_ack(publish) {
787            self.manual_ack(ack).await?;
788        }
789        Ok(())
790    }
791
792    /// Attempts to send a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
793    /// Only needed if the `manual_acks` flag is set.
794    ///
795    /// # Errors
796    ///
797    /// Returns an error if the derived acknowledgement cannot be queued
798    /// immediately on the event loop.
799    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
800        if let Some(ack) = self.prepare_ack(publish) {
801            self.try_manual_ack(ack)?;
802        }
803        Ok(())
804    }
805
806    /// Sends a MQTT Publish to the `EventLoop`
807    async fn handle_publish_bytes<T>(
808        &self,
809        topic: T,
810        qos: QoS,
811        retain: bool,
812        payload: Bytes,
813    ) -> Result<(), ClientError>
814    where
815        T: Into<PublishTopic>,
816    {
817        let (topic, needs_validation) = topic.into().into_string_and_validation();
818        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
819        let mut publish = Publish::from_bytes(topic, qos, payload);
820        publish.retain = retain;
821        let publish = Request::Publish(publish);
822
823        if invalid_topic {
824            return Err(ClientError::Request(publish));
825        }
826
827        self.send_request_async(publish).await?;
828        Ok(())
829    }
830
831    async fn handle_publish_bytes_tracked<T>(
832        &self,
833        topic: T,
834        qos: QoS,
835        retain: bool,
836        payload: Bytes,
837    ) -> Result<PublishNotice, ClientError>
838    where
839        T: Into<PublishTopic>,
840    {
841        let (topic, needs_validation) = topic.into().into_string_and_validation();
842        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
843        let mut publish = Publish::from_bytes(topic, qos, payload);
844        publish.retain = retain;
845        let request = Request::Publish(publish.clone());
846
847        if invalid_topic {
848            return Err(ClientError::Request(request));
849        }
850
851        self.send_tracked_publish_async(publish).await
852    }
853
854    /// Sends a MQTT Publish to the `EventLoop`
855    ///
856    /// # Errors
857    ///
858    /// Returns an error if the topic is invalid or if the request cannot be
859    /// queued on the event loop.
860    pub async fn publish_bytes<T>(
861        &self,
862        topic: T,
863        qos: QoS,
864        retain: bool,
865        payload: Bytes,
866    ) -> Result<(), ClientError>
867    where
868        T: Into<PublishTopic>,
869    {
870        self.handle_publish_bytes(topic, qos, retain, payload).await
871    }
872
873    /// Sends a MQTT Publish with `Bytes` payload and returns a tracked notice.
874    ///
875    /// # Errors
876    ///
877    /// Returns an error if the topic is invalid or if the request cannot be
878    /// queued on the event loop.
879    pub async fn publish_bytes_tracked<T>(
880        &self,
881        topic: T,
882        qos: QoS,
883        retain: bool,
884        payload: Bytes,
885    ) -> Result<PublishNotice, ClientError>
886    where
887        T: Into<PublishTopic>,
888    {
889        self.handle_publish_bytes_tracked(topic, qos, retain, payload)
890            .await
891    }
892
893    /// Sends a MQTT Subscribe to the `EventLoop`
894    ///
895    /// # Errors
896    ///
897    /// Returns an error if the topic filter is invalid or if the request
898    /// cannot be queued on the event loop.
899    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
900        let subscribe = Subscribe::new(topic, qos);
901        if !subscribe_has_valid_filters(&subscribe) {
902            return Err(ClientError::Request(subscribe.into()));
903        }
904
905        self.send_request_async(subscribe.into()).await?;
906        Ok(())
907    }
908
909    /// Sends a tracked MQTT Subscribe to the `EventLoop`.
910    ///
911    /// # Errors
912    ///
913    /// Returns an error if the topic filter is invalid or if the request
914    /// cannot be queued on the event loop.
915    pub async fn subscribe_tracked<S: Into<String>>(
916        &self,
917        topic: S,
918        qos: QoS,
919    ) -> Result<SubscribeNotice, ClientError> {
920        let subscribe = Subscribe::new(topic, qos);
921        if !subscribe_has_valid_filters(&subscribe) {
922            return Err(ClientError::Request(subscribe.into()));
923        }
924
925        self.send_tracked_subscribe_async(subscribe).await
926    }
927
928    /// Attempts to send a MQTT Subscribe to the `EventLoop`
929    ///
930    /// # Errors
931    ///
932    /// Returns an error if the topic filter is invalid or if the request
933    /// cannot be queued immediately on the event loop.
934    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
935        let subscribe = Subscribe::new(topic, qos);
936        if !subscribe_has_valid_filters(&subscribe) {
937            return Err(ClientError::TryRequest(subscribe.into()));
938        }
939
940        self.try_send_request(subscribe.into())?;
941        Ok(())
942    }
943
944    /// Attempts to send a tracked MQTT Subscribe to the `EventLoop`.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if the topic filter is invalid or if the request
949    /// cannot be queued immediately on the event loop.
950    pub fn try_subscribe_tracked<S: Into<String>>(
951        &self,
952        topic: S,
953        qos: QoS,
954    ) -> Result<SubscribeNotice, ClientError> {
955        let subscribe = Subscribe::new(topic, qos);
956        if !subscribe_has_valid_filters(&subscribe) {
957            return Err(ClientError::TryRequest(subscribe.into()));
958        }
959
960        self.try_send_tracked_subscribe(subscribe)
961    }
962
963    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
964    ///
965    /// # Errors
966    ///
967    /// Returns an error if the filter list is invalid or if the request cannot
968    /// be queued on the event loop.
969    pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
970    where
971        T: IntoIterator<Item = SubscribeFilter>,
972    {
973        let subscribe = Subscribe::new_many(topics);
974        if !subscribe_has_valid_filters(&subscribe) {
975            return Err(ClientError::Request(subscribe.into()));
976        }
977
978        self.send_request_async(subscribe.into()).await?;
979        Ok(())
980    }
981
982    /// Sends a tracked MQTT Subscribe for multiple topics to the `EventLoop`.
983    ///
984    /// # Errors
985    ///
986    /// Returns an error if the filter list is invalid or if the request cannot
987    /// be queued on the event loop.
988    pub async fn subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
989    where
990        T: IntoIterator<Item = SubscribeFilter>,
991    {
992        let subscribe = Subscribe::new_many(topics);
993        if !subscribe_has_valid_filters(&subscribe) {
994            return Err(ClientError::Request(subscribe.into()));
995        }
996
997        self.send_tracked_subscribe_async(subscribe).await
998    }
999
1000    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns an error if the filter list is invalid or if the request cannot
1005    /// be queued immediately on the event loop.
1006    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1007    where
1008        T: IntoIterator<Item = SubscribeFilter>,
1009    {
1010        let subscribe = Subscribe::new_many(topics);
1011        if !subscribe_has_valid_filters(&subscribe) {
1012            return Err(ClientError::TryRequest(subscribe.into()));
1013        }
1014        self.try_send_request(subscribe.into())?;
1015        Ok(())
1016    }
1017
1018    /// Attempts to send a tracked MQTT Subscribe for multiple topics to the `EventLoop`.
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns an error if the filter list is invalid or if the request cannot
1023    /// be queued immediately on the event loop.
1024    pub fn try_subscribe_many_tracked<T>(&self, topics: T) -> Result<SubscribeNotice, ClientError>
1025    where
1026        T: IntoIterator<Item = SubscribeFilter>,
1027    {
1028        let subscribe = Subscribe::new_many(topics);
1029        if !subscribe_has_valid_filters(&subscribe) {
1030            return Err(ClientError::TryRequest(subscribe.into()));
1031        }
1032
1033        self.try_send_tracked_subscribe(subscribe)
1034    }
1035
1036    /// Sends a MQTT Unsubscribe to the `EventLoop`
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the request cannot be queued on the event loop.
1041    pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1042        let unsubscribe = Unsubscribe::new(topic.into());
1043        let request = Request::Unsubscribe(unsubscribe);
1044        self.send_request_async(request).await?;
1045        Ok(())
1046    }
1047
1048    /// Sends a tracked MQTT Unsubscribe to the `EventLoop`.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if the request cannot be queued on the event loop.
1053    pub async fn unsubscribe_tracked<S: Into<String>>(
1054        &self,
1055        topic: S,
1056    ) -> Result<UnsubscribeNotice, ClientError> {
1057        let unsubscribe = Unsubscribe::new(topic.into());
1058        self.send_tracked_unsubscribe_async(unsubscribe).await
1059    }
1060
1061    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`
1062    ///
1063    /// # Errors
1064    ///
1065    /// Returns an error if the request cannot be queued immediately on the
1066    /// event loop.
1067    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1068        let unsubscribe = Unsubscribe::new(topic.into());
1069        let request = Request::Unsubscribe(unsubscribe);
1070        self.try_send_request(request)?;
1071        Ok(())
1072    }
1073
1074    /// Attempts to send a tracked MQTT Unsubscribe to the `EventLoop`.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the request cannot be queued immediately on the
1079    /// event loop.
1080    pub fn try_unsubscribe_tracked<S: Into<String>>(
1081        &self,
1082        topic: S,
1083    ) -> Result<UnsubscribeNotice, ClientError> {
1084        let unsubscribe = Unsubscribe::new(topic.into());
1085        self.try_send_tracked_unsubscribe(unsubscribe)
1086    }
1087
1088    /// Queues a graceful MQTT disconnect barrier.
1089    ///
1090    /// Once the event loop observes this request, it stops processing later
1091    /// application work, flushes previously accepted `QoS` 0 publishes, and
1092    /// waits for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and
1093    /// tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1094    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1095    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1096    /// `UNSUBACK`. It then sends and flushes MQTT `DISCONNECT`.
1097    ///
1098    /// This request uses the normal client request channel. Under publish
1099    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1100    /// that are not currently sendable; once observed, it becomes the graceful
1101    /// drain barrier.
1102    ///
1103    /// # Errors
1104    ///
1105    /// Returns an error if the disconnect request cannot be queued on the
1106    /// event loop.
1107    pub async fn disconnect(&self) -> Result<(), ClientError> {
1108        let request = Request::Disconnect(Disconnect);
1109        self.send_request_async(request).await?;
1110        Ok(())
1111    }
1112
1113    /// Queues a graceful MQTT disconnect barrier with a drain timeout.
1114    ///
1115    /// Once the event loop observes this request, it stops processing later
1116    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1117    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1118    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1119    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1120    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1121    /// `UNSUBACK`.
1122    ///
1123    /// If the drain completes before the deadline, the event loop sends and
1124    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1125    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1126    ///
1127    /// This request uses the normal client request channel. The timeout starts
1128    /// only after the event loop observes this request, not necessarily when
1129    /// this method queues it.
1130    ///
1131    /// # Errors
1132    ///
1133    /// Returns an error if the disconnect request cannot be queued on the
1134    /// event loop.
1135    pub async fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1136        let request = Request::DisconnectWithTimeout(Disconnect, timeout);
1137        self.send_request_async(request).await?;
1138        Ok(())
1139    }
1140
1141    /// Sends a MQTT disconnect immediately without waiting for in-flight requests.
1142    ///
1143    /// This request uses a dedicated immediate shutdown channel, not the normal
1144    /// application request channel. It may bypass queued application work and
1145    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1146    ///
1147    /// # Errors
1148    ///
1149    /// Returns an error if the disconnect request cannot be queued on the
1150    /// event loop.
1151    pub async fn disconnect_now(&self) -> Result<(), ClientError> {
1152        let request = Request::DisconnectNow(Disconnect);
1153        self.send_immediate_disconnect_async(request).await?;
1154        Ok(())
1155    }
1156
1157    /// Attempts to queue a graceful MQTT disconnect barrier.
1158    ///
1159    /// Once the event loop observes this request, it stops processing later
1160    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1161    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1162    /// subscribe/unsubscribe requests to complete before sending MQTT
1163    /// `DISCONNECT`.
1164    ///
1165    /// This request uses the normal client request channel. Under publish
1166    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1167    /// that are not currently sendable; once observed, it becomes the graceful
1168    /// drain barrier.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the disconnect request cannot be queued
1173    /// immediately on the event loop.
1174    pub fn try_disconnect(&self) -> Result<(), ClientError> {
1175        let request = Request::Disconnect(Disconnect);
1176        self.try_send_request(request)?;
1177        Ok(())
1178    }
1179
1180    /// Attempts to queue a graceful MQTT disconnect with a drain timeout.
1181    ///
1182    /// Once the event loop observes this request, it stops processing later
1183    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1184    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1185    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1186    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1187    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1188    /// `UNSUBACK`.
1189    ///
1190    /// If the drain completes before the deadline, the event loop sends and
1191    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1192    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1193    ///
1194    /// This request uses the normal client request channel. The timeout starts
1195    /// only after the event loop observes this request, not necessarily when
1196    /// this method queues it.
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if the disconnect request cannot be queued
1201    /// immediately on the event loop.
1202    pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1203        let request = Request::DisconnectWithTimeout(Disconnect, timeout);
1204        self.try_send_request(request)?;
1205        Ok(())
1206    }
1207
1208    /// Attempts to queue an immediate MQTT disconnect.
1209    ///
1210    /// This request uses a dedicated immediate shutdown channel, not the normal
1211    /// application request channel. It may bypass queued application work and
1212    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1213    ///
1214    /// # Errors
1215    ///
1216    /// Returns an error if the disconnect request cannot be queued
1217    /// immediately on the event loop.
1218    pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
1219        let request = Request::DisconnectNow(Disconnect);
1220        self.try_send_immediate_disconnect(request)?;
1221        Ok(())
1222    }
1223}
1224
1225const fn prepare_ack(publish: &Publish) -> Option<ManualAck> {
1226    let ack = match publish.qos {
1227        QoS::AtMostOnce => return None,
1228        QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)),
1229        QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)),
1230    };
1231    Some(ack)
1232}
1233
1234/// A synchronous client, communicates with MQTT `EventLoop`.
1235///
1236/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
1237/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
1238/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
1239///
1240/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
1241/// to send, receive and process packets from the broker, i.e. move ahead.
1242///
1243/// An asynchronous channel handle can also be extracted if necessary.
1244#[derive(Clone)]
1245pub struct Client {
1246    client: AsyncClient,
1247}
1248
1249impl Client {
1250    /// Create a builder for a [`Client`].
1251    ///
1252    /// The returned [`ClientBuilder`] only builds the synchronous client
1253    /// pair, which keeps the terminal `build()` method aligned with this
1254    /// entry point.
1255    #[must_use]
1256    pub const fn builder(options: MqttOptions) -> ClientBuilder {
1257        ClientBuilder::new(options)
1258    }
1259
1260    /// Create a new `Client` from a channel `Sender`.
1261    ///
1262    /// This is mostly useful for creating a test instance where you can
1263    /// listen on the corresponding receiver.
1264    #[must_use]
1265    pub const fn from_sender(request_tx: Sender<Request>) -> Self {
1266        Self {
1267            client: AsyncClient::from_senders(request_tx),
1268        }
1269    }
1270
1271    /// Sends a MQTT Publish to the `EventLoop`
1272    ///
1273    /// # Errors
1274    ///
1275    /// Returns an error if the topic is invalid or if the request cannot be
1276    /// queued on the event loop.
1277    pub fn publish<T, V>(
1278        &self,
1279        topic: T,
1280        qos: QoS,
1281        retain: bool,
1282        payload: V,
1283    ) -> Result<(), ClientError>
1284    where
1285        T: Into<PublishTopic>,
1286        V: Into<Vec<u8>>,
1287    {
1288        let (topic, needs_validation) = topic.into().into_string_and_validation();
1289        let invalid_topic = needs_validation && !valid_publish_topic(&topic);
1290        let mut publish = Publish::new(topic, qos, payload);
1291        publish.retain = retain;
1292        let publish = Request::Publish(publish);
1293        if invalid_topic {
1294            return Err(ClientError::Request(publish));
1295        }
1296        self.client.send_request(publish)?;
1297        Ok(())
1298    }
1299
1300    /// Attempts to send a MQTT Publish to the `EventLoop`.
1301    ///
1302    /// # Errors
1303    ///
1304    /// Returns an error if the topic is invalid or if the request cannot be
1305    /// queued immediately on the event loop.
1306    pub fn try_publish<T, V>(
1307        &self,
1308        topic: T,
1309        qos: QoS,
1310        retain: bool,
1311        payload: V,
1312    ) -> Result<(), ClientError>
1313    where
1314        T: Into<PublishTopic>,
1315        V: Into<Vec<u8>>,
1316    {
1317        self.client.try_publish(topic, qos, retain, payload)?;
1318        Ok(())
1319    }
1320
1321    /// Prepares a MQTT PubAck/PubRec packet for manual acknowledgement.
1322    ///
1323    /// Returns `None` for `QoS0` publishes, which do not require acknowledgement.
1324    pub const fn prepare_ack(&self, publish: &Publish) -> Option<ManualAck> {
1325        self.client.prepare_ack(publish)
1326    }
1327
1328    /// Sends a prepared MQTT PubAck/PubRec to the `EventLoop`.
1329    ///
1330    /// # Errors
1331    ///
1332    /// Returns an error if the acknowledgement cannot be queued on the event
1333    /// loop.
1334    pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1335        self.client.send_request(ack.into_request())?;
1336        Ok(())
1337    }
1338
1339    /// Attempts to send a prepared MQTT PubAck/PubRec to the `EventLoop`.
1340    ///
1341    /// # Errors
1342    ///
1343    /// Returns an error if the acknowledgement cannot be queued immediately on
1344    /// the event loop.
1345    pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> {
1346        self.client.try_manual_ack(ack)?;
1347        Ok(())
1348    }
1349
1350    /// Sends a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
1351    /// Only needed if the `manual_acks` flag is set.
1352    ///
1353    /// # Errors
1354    ///
1355    /// Returns an error if the derived acknowledgement cannot be queued on the
1356    /// event loop.
1357    pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
1358        if let Some(ack) = self.prepare_ack(publish) {
1359            self.manual_ack(ack)?;
1360        }
1361        Ok(())
1362    }
1363
1364    /// Attempts to send a MQTT PubAck/PubRec to the `EventLoop` based on publish `QoS`.
1365    /// Only needed if the `manual_acks` flag is set.
1366    ///
1367    /// # Errors
1368    ///
1369    /// Returns an error if the derived acknowledgement cannot be queued
1370    /// immediately on the event loop.
1371    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
1372        if let Some(ack) = self.prepare_ack(publish) {
1373            self.try_manual_ack(ack)?;
1374        }
1375        Ok(())
1376    }
1377
1378    /// Sends a MQTT Subscribe to the `EventLoop`
1379    ///
1380    /// # Errors
1381    ///
1382    /// Returns an error if the topic filter is invalid or if the request
1383    /// cannot be queued on the event loop.
1384    pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1385        let subscribe = Subscribe::new(topic, qos);
1386        if !subscribe_has_valid_filters(&subscribe) {
1387            return Err(ClientError::Request(subscribe.into()));
1388        }
1389
1390        self.client.send_request(subscribe.into())?;
1391        Ok(())
1392    }
1393
1394    /// Sends a MQTT Subscribe to the `EventLoop`
1395    ///
1396    /// # Errors
1397    ///
1398    /// Returns an error if the topic filter is invalid or if the request
1399    /// cannot be queued immediately on the event loop.
1400    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
1401        self.client.try_subscribe(topic, qos)?;
1402        Ok(())
1403    }
1404
1405    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
1406    ///
1407    /// # Errors
1408    ///
1409    /// Returns an error if the filter list is invalid or if the request cannot
1410    /// be queued on the event loop.
1411    pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1412    where
1413        T: IntoIterator<Item = SubscribeFilter>,
1414    {
1415        let subscribe = Subscribe::new_many(topics);
1416        if !subscribe_has_valid_filters(&subscribe) {
1417            return Err(ClientError::Request(subscribe.into()));
1418        }
1419
1420        self.client.send_request(subscribe.into())?;
1421        Ok(())
1422    }
1423
1424    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`.
1425    ///
1426    /// # Errors
1427    ///
1428    /// Returns an error if the filter list is invalid or if the request cannot
1429    /// be queued immediately on the event loop.
1430    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
1431    where
1432        T: IntoIterator<Item = SubscribeFilter>,
1433    {
1434        self.client.try_subscribe_many(topics)
1435    }
1436
1437    /// Sends a MQTT Unsubscribe to the `EventLoop`
1438    ///
1439    /// # Errors
1440    ///
1441    /// Returns an error if the request cannot be queued on the event loop.
1442    pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1443        let unsubscribe = Unsubscribe::new(topic.into());
1444        let request = Request::Unsubscribe(unsubscribe);
1445        self.client.send_request(request)?;
1446        Ok(())
1447    }
1448
1449    /// Sends a MQTT Unsubscribe to the `EventLoop`
1450    ///
1451    /// # Errors
1452    ///
1453    /// Returns an error if the request cannot be queued immediately on the
1454    /// event loop.
1455    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
1456        self.client.try_unsubscribe(topic)?;
1457        Ok(())
1458    }
1459
1460    /// Queues a graceful MQTT disconnect barrier.
1461    ///
1462    /// Once the event loop observes this request, it stops processing later
1463    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1464    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1465    /// subscribe/unsubscribe requests to complete before sending MQTT
1466    /// `DISCONNECT`.
1467    ///
1468    /// This request uses the normal client request channel. Under publish
1469    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1470    /// that are not currently sendable; once observed, it becomes the graceful
1471    /// drain barrier.
1472    ///
1473    /// # Errors
1474    ///
1475    /// Returns an error if the disconnect request cannot be queued on the
1476    /// event loop.
1477    pub fn disconnect(&self) -> Result<(), ClientError> {
1478        let request = Request::Disconnect(Disconnect);
1479        self.client.send_request(request)?;
1480        Ok(())
1481    }
1482
1483    /// Queues a graceful MQTT disconnect barrier with a drain timeout.
1484    ///
1485    /// Once the event loop observes this request, it stops processing later
1486    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1487    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1488    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1489    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1490    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1491    /// `UNSUBACK`.
1492    ///
1493    /// If the drain completes before the deadline, the event loop sends and
1494    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1495    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1496    ///
1497    /// This request uses the normal client request channel. The timeout starts
1498    /// only after the event loop observes this request, not necessarily when
1499    /// this method queues it.
1500    ///
1501    /// # Errors
1502    ///
1503    /// Returns an error if the disconnect request cannot be queued on the
1504    /// event loop.
1505    pub fn disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1506        let request = Request::DisconnectWithTimeout(Disconnect, timeout);
1507        self.client.send_request(request)?;
1508        Ok(())
1509    }
1510
1511    /// Sends a MQTT disconnect immediately without waiting for in-flight requests.
1512    ///
1513    /// This request uses a dedicated immediate shutdown channel, not the normal
1514    /// application request channel. It may bypass queued application work and
1515    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1516    ///
1517    /// # Errors
1518    ///
1519    /// Returns an error if the disconnect request cannot be queued on the
1520    /// event loop.
1521    pub fn disconnect_now(&self) -> Result<(), ClientError> {
1522        let request = Request::DisconnectNow(Disconnect);
1523        self.client.send_immediate_disconnect(request)?;
1524        Ok(())
1525    }
1526
1527    /// Attempts to queue a graceful MQTT disconnect barrier.
1528    ///
1529    /// Once the event loop observes this request, it stops processing later
1530    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1531    /// for previously accepted outbound `QoS` 1/ `QoS` 2 publishes and tracked
1532    /// subscribe/unsubscribe requests to complete before sending MQTT
1533    /// `DISCONNECT`.
1534    ///
1535    /// This request uses the normal client request channel. Under publish
1536    /// flow-control pressure, it may pass earlier `QoS` 1/ `QoS` 2 publishes
1537    /// that are not currently sendable; once observed, it becomes the graceful
1538    /// drain barrier.
1539    ///
1540    /// # Errors
1541    ///
1542    /// Returns an error if the disconnect request cannot be queued
1543    /// immediately on the event loop.
1544    pub fn try_disconnect(&self) -> Result<(), ClientError> {
1545        self.client.try_disconnect()?;
1546        Ok(())
1547    }
1548
1549    /// Attempts to queue a graceful MQTT disconnect barrier with a drain timeout.
1550    ///
1551    /// Once the event loop observes this request, it stops processing later
1552    /// application work, flushes previously accepted `QoS` 0 publishes, and waits
1553    /// up to `timeout` for previously accepted outbound `QoS` 1/ `QoS` 2 publishes
1554    /// and tracked subscribe/unsubscribe requests to complete. `QoS` 1 publishes
1555    /// complete on `PUBACK`, `QoS` 2 publishes complete on `PUBCOMP`, tracked
1556    /// subscribes complete on `SUBACK`, and tracked unsubscribes complete on
1557    /// `UNSUBACK`.
1558    ///
1559    /// If the drain completes before the deadline, the event loop sends and
1560    /// flushes MQTT `DISCONNECT`. If the deadline expires first, polling returns
1561    /// `ConnectionError::DisconnectTimeout` and MQTT `DISCONNECT` is not sent.
1562    ///
1563    /// This request uses the normal client request channel. The timeout starts
1564    /// only after the event loop observes this request, not necessarily when
1565    /// this method queues it.
1566    ///
1567    /// # Errors
1568    ///
1569    /// Returns an error if the disconnect request cannot be queued
1570    /// immediately on the event loop.
1571    pub fn try_disconnect_with_timeout(&self, timeout: Duration) -> Result<(), ClientError> {
1572        self.client.try_disconnect_with_timeout(timeout)?;
1573        Ok(())
1574    }
1575
1576    /// Sends a MQTT disconnect immediately without waiting for in-flight requests.
1577    ///
1578    /// This request uses a dedicated immediate shutdown channel, not the normal
1579    /// application request channel. It may bypass queued application work and
1580    /// does not wait for unresolved `QoS` 1/ `QoS` 2 publish handshakes.
1581    ///
1582    /// # Errors
1583    ///
1584    /// Returns an error if the disconnect request cannot be queued
1585    /// immediately on the event loop.
1586    pub fn try_disconnect_now(&self) -> Result<(), ClientError> {
1587        self.client.try_disconnect_now()?;
1588        Ok(())
1589    }
1590}
1591
1592#[must_use]
1593fn valid_publish_topic(topic: &str) -> bool {
1594    !topic.is_empty() && valid_topic(topic)
1595}
1596
1597#[must_use]
1598fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
1599    !subscribe.filters.is_empty()
1600        && subscribe
1601            .filters
1602            .iter()
1603            .all(|filter| valid_filter(&filter.path))
1604}
1605
1606/// Error type returned by [`Connection::recv`]
1607#[derive(Debug, Eq, PartialEq)]
1608pub struct RecvError;
1609
1610/// Error type returned by [`Connection::try_recv`]
1611#[derive(Debug, Eq, PartialEq)]
1612pub enum TryRecvError {
1613    /// User has closed requests channel
1614    Disconnected,
1615    /// Did not resolve
1616    Empty,
1617}
1618
1619/// Error type returned by [`Connection::recv_timeout`]
1620#[derive(Debug, Eq, PartialEq)]
1621pub enum RecvTimeoutError {
1622    /// User has closed requests channel
1623    Disconnected,
1624    /// Recv request timedout
1625    Timeout,
1626}
1627
1628///  MQTT connection. Maintains all the necessary state
1629pub struct Connection {
1630    pub eventloop: EventLoop,
1631    runtime: Runtime,
1632}
1633impl Connection {
1634    const fn new(eventloop: EventLoop, runtime: Runtime) -> Self {
1635        Self { eventloop, runtime }
1636    }
1637
1638    /// Returns an iterator over this connection. Iterating over this is all that's
1639    /// necessary to make connection progress and maintain a robust connection.
1640    /// Just continuing to loop will reconnect
1641    /// **NOTE** Don't block this while iterating
1642    // ideally this should be named iter_mut because it requires a mutable reference
1643    // Also we can implement IntoIter for this to make it easy to iterate over it
1644    #[must_use = "Connection should be iterated over a loop to make progress"]
1645    pub const fn iter(&mut self) -> Iter<'_> {
1646        Iter { connection: self }
1647    }
1648
1649    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
1650    /// if all clients/users have closed requests channel.
1651    ///
1652    /// [`EventLoop`]: super::EventLoop
1653    ///
1654    /// # Errors
1655    ///
1656    /// Returns [`RecvError`] if all request senders have been dropped.
1657    pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
1658        let f = self.eventloop.poll();
1659        let event = self.runtime.block_on(f);
1660
1661        resolve_event(event).ok_or(RecvError)
1662    }
1663
1664    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
1665    /// if none immediately present or all clients/users have closed requests channel.
1666    ///
1667    /// [`EventLoop`]: super::EventLoop
1668    ///
1669    /// # Errors
1670    ///
1671    /// Returns [`TryRecvError::Empty`] if no event is immediately ready, or
1672    /// [`TryRecvError::Disconnected`] if all request senders have been dropped.
1673    pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
1674        let f = self.eventloop.poll();
1675        // Enters the runtime context so we can poll the future, as required by `now_or_never()`.
1676        // ref: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.enter
1677        let _guard = self.runtime.enter();
1678        let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
1679
1680        resolve_event(event).ok_or(TryRecvError::Disconnected)
1681    }
1682
1683    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
1684    /// if all clients/users have closed requests channel or the timeout has expired.
1685    ///
1686    /// [`EventLoop`]: super::EventLoop
1687    ///
1688    /// # Errors
1689    ///
1690    /// Returns [`RecvTimeoutError::Timeout`] if no event arrives before
1691    /// `duration`, or [`RecvTimeoutError::Disconnected`] if all request
1692    /// senders have been dropped.
1693    pub fn recv_timeout(
1694        &mut self,
1695        duration: Duration,
1696    ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
1697        let f = self.eventloop.poll();
1698        let event = self
1699            .runtime
1700            .block_on(async { timeout(duration, f).await })
1701            .map_err(|_| RecvTimeoutError::Timeout)?;
1702
1703        resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
1704    }
1705}
1706
1707fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
1708    match event {
1709        Ok(v) => Some(Ok(v)),
1710        // closing of request channel should stop the iterator
1711        Err(ConnectionError::RequestsDone) => {
1712            trace!("Done with requests");
1713            None
1714        }
1715        Err(e) => Some(Err(e)),
1716    }
1717}
1718
1719/// Iterator which polls the `EventLoop` for connection progress
1720pub struct Iter<'a> {
1721    connection: &'a mut Connection,
1722}
1723
1724impl Iterator for Iter<'_> {
1725    type Item = Result<Event, ConnectionError>;
1726
1727    fn next(&mut self) -> Option<Self::Item> {
1728        self.connection.recv().ok()
1729    }
1730}
1731
1732#[cfg(test)]
1733mod test {
1734    use super::*;
1735    use crate::LastWill;
1736
1737    #[test]
1738    fn calling_iter_twice_on_connection_shouldnt_panic() {
1739        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
1740        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
1741        mqttoptions.set_keep_alive(5).set_last_will(will);
1742
1743        let (_, mut connection) = Client::builder(mqttoptions).capacity(10).build();
1744        let _ = connection.iter();
1745        let _ = connection.iter();
1746    }
1747
1748    #[test]
1749    fn builder_uses_options_request_channel_capacity_by_default() {
1750        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
1751        mqttoptions.set_request_channel_capacity(1);
1752        let builder: AsyncClientBuilder = AsyncClient::builder(mqttoptions);
1753        let (client, _eventloop) = builder.build();
1754
1755        client
1756            .try_publish("hello/world", QoS::AtMostOnce, false, "one")
1757            .expect("first request should fit configured capacity");
1758        assert!(matches!(
1759            client.try_publish("hello/world", QoS::AtMostOnce, false, "two"),
1760            Err(ClientError::TryRequest(Request::Publish(_)))
1761        ));
1762    }
1763
1764    #[test]
1765    fn sync_and_async_entry_points_return_distinct_builder_types() {
1766        let sync_builder = Client::builder(MqttOptions::new("test-sync", "localhost"));
1767        let async_builder = AsyncClient::builder(MqttOptions::new("test-async", "localhost"));
1768
1769        let _: ClientBuilder = sync_builder;
1770        let _: AsyncClientBuilder = async_builder;
1771    }
1772
1773    #[test]
1774    fn builder_capacity_overrides_options_request_channel_capacity() {
1775        let mut mqttoptions = MqttOptions::new("test-1", "localhost");
1776        mqttoptions.set_request_channel_capacity(1);
1777        let (client, _eventloop) = Client::builder(mqttoptions).capacity(2).build();
1778
1779        client
1780            .try_publish("hello/world", QoS::AtMostOnce, false, "one")
1781            .expect("first request should fit overridden capacity");
1782        client
1783            .try_publish("hello/world", QoS::AtMostOnce, false, "two")
1784            .expect("second request should fit overridden capacity");
1785        assert!(matches!(
1786            client.try_publish("hello/world", QoS::AtMostOnce, false, "three"),
1787            Err(ClientError::TryRequest(Request::Publish(_)))
1788        ));
1789    }
1790
1791    #[test]
1792    fn builder_capacity_zero_is_bounded_rendezvous() {
1793        let mqttoptions = MqttOptions::new("test-1", "localhost");
1794        let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(0).build();
1795
1796        assert!(matches!(
1797            client.try_publish("hello/world", QoS::AtMostOnce, false, "one"),
1798            Err(ClientError::TryRequest(Request::Publish(_)))
1799        ));
1800    }
1801
1802    #[test]
1803    fn unbounded_builder_allows_try_publish_without_polling() {
1804        let mqttoptions = MqttOptions::new("test-1", "localhost");
1805        let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
1806
1807        for i in 0..128 {
1808            client
1809                .try_publish("hello/world", QoS::AtMostOnce, false, vec![i])
1810                .expect("unbounded channel should accept requests without polling");
1811        }
1812    }
1813
1814    #[tokio::test]
1815    async fn bounded_publish_blocks_when_channel_is_full_without_polling() {
1816        let mqttoptions = MqttOptions::new("test-1", "localhost");
1817        let (client, _eventloop) = AsyncClient::builder(mqttoptions).capacity(1).build();
1818
1819        client
1820            .publish("hello/world", QoS::AtMostOnce, false, "one")
1821            .await
1822            .expect("first request should fit bounded channel");
1823
1824        let result = tokio::time::timeout(
1825            std::time::Duration::from_millis(25),
1826            client.publish("hello/world", QoS::AtMostOnce, false, "two"),
1827        )
1828        .await;
1829        assert!(result.is_err());
1830    }
1831
1832    #[tokio::test]
1833    async fn unbounded_publish_completes_without_polling() {
1834        let mqttoptions = MqttOptions::new("test-1", "localhost");
1835        let (client, _eventloop) = AsyncClient::builder(mqttoptions).unbounded().build();
1836
1837        for i in 0..128 {
1838            client
1839                .publish("hello/world", QoS::AtMostOnce, false, vec![i])
1840                .await
1841                .expect("unbounded channel should accept requests without polling");
1842        }
1843    }
1844
1845    #[test]
1846    fn should_be_able_to_build_test_client_from_channel() {
1847        let (tx, rx) = flume::bounded(1);
1848        let client = Client::from_sender(tx);
1849        client
1850            .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
1851            .expect("Should be able to publish");
1852        let _ = rx.try_recv().expect("Should have message");
1853    }
1854
1855    #[test]
1856    fn prepare_ack_maps_qos_to_manual_ack_packets_v4() {
1857        let (tx, _) = flume::bounded(1);
1858        let client = Client::from_sender(tx);
1859
1860        let qos0 = Publish::new("hello/world", QoS::AtMostOnce, vec![1]);
1861        assert!(client.prepare_ack(&qos0).is_none());
1862
1863        let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1]);
1864        qos1.pkid = 7;
1865        match client.prepare_ack(&qos1) {
1866            Some(ManualAck::PubAck(ack)) => assert_eq!(ack.pkid, 7),
1867            ack => panic!("expected QoS1 PubAck, got {ack:?}"),
1868        }
1869
1870        let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1]);
1871        qos2.pkid = 9;
1872        match client.prepare_ack(&qos2) {
1873            Some(ManualAck::PubRec(ack)) => assert_eq!(ack.pkid, 9),
1874            ack => panic!("expected QoS2 PubRec, got {ack:?}"),
1875        }
1876    }
1877
1878    #[test]
1879    fn manual_ack_sends_puback_request_v4() {
1880        let (tx, rx) = flume::bounded(1);
1881        let client = Client::from_sender(tx);
1882        client
1883            .manual_ack(ManualAck::PubAck(PubAck::new(42)))
1884            .expect("manual_ack should send request");
1885
1886        let request = rx.try_recv().expect("Should have ack request");
1887        match request {
1888            Request::PubAck(ack) => assert_eq!(ack.pkid, 42),
1889            request => panic!("Expected PubAck request, got {request:?}"),
1890        }
1891    }
1892
1893    #[test]
1894    fn try_manual_ack_sends_pubrec_request_v4() {
1895        let (tx, rx) = flume::bounded(1);
1896        let client = Client::from_sender(tx);
1897        client
1898            .try_manual_ack(ManualAck::PubRec(PubRec::new(51)))
1899            .expect("try_manual_ack should send request");
1900
1901        let request = rx.try_recv().expect("Should have ack request");
1902        match request {
1903            Request::PubRec(ack) => assert_eq!(ack.pkid, 51),
1904            request => panic!("Expected PubRec request, got {request:?}"),
1905        }
1906    }
1907
1908    #[test]
1909    fn ack_and_try_ack_use_manual_ack_flow_v4() {
1910        let (tx, rx) = flume::bounded(2);
1911        let client = Client::from_sender(tx);
1912
1913        let mut qos1 = Publish::new("hello/world", QoS::AtLeastOnce, vec![1]);
1914        qos1.pkid = 11;
1915        client.ack(&qos1).expect("ack should send PubAck");
1916
1917        let mut qos2 = Publish::new("hello/world", QoS::ExactlyOnce, vec![1]);
1918        qos2.pkid = 13;
1919        client
1920            .try_ack(&qos2)
1921            .expect("try_ack should send PubRec request");
1922
1923        let first = rx.try_recv().expect("Should receive first ack request");
1924        match first {
1925            Request::PubAck(ack) => assert_eq!(ack.pkid, 11),
1926            request => panic!("Expected PubAck request, got {request:?}"),
1927        }
1928
1929        let second = rx.try_recv().expect("Should receive second ack request");
1930        match second {
1931            Request::PubRec(ack) => assert_eq!(ack.pkid, 13),
1932            request => panic!("Expected PubRec request, got {request:?}"),
1933        }
1934    }
1935
1936    #[test]
1937    fn can_publish_with_validated_topic() {
1938        let (tx, rx) = flume::bounded(1);
1939        let client = Client::from_sender(tx);
1940        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
1941        client
1942            .publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
1943            .expect("Should be able to publish");
1944        let _ = rx.try_recv().expect("Should have message");
1945    }
1946
1947    #[test]
1948    fn publish_accepts_borrowed_string_topic() {
1949        let (tx, rx) = flume::bounded(2);
1950        let client = Client::from_sender(tx);
1951        let topic = "hello/world".to_string();
1952        client
1953            .publish(&topic, QoS::ExactlyOnce, false, "good bye")
1954            .expect("Should be able to publish");
1955        client
1956            .try_publish(&topic, QoS::ExactlyOnce, false, "good bye")
1957            .expect("Should be able to publish");
1958        let _ = rx.try_recv().expect("Should have message");
1959        let _ = rx.try_recv().expect("Should have message");
1960    }
1961
1962    #[test]
1963    fn publish_accepts_cow_topic_variants() {
1964        let (tx, rx) = flume::bounded(2);
1965        let client = Client::from_sender(tx);
1966        client
1967            .publish(
1968                std::borrow::Cow::Borrowed("hello/world"),
1969                QoS::ExactlyOnce,
1970                false,
1971                "good bye",
1972            )
1973            .expect("Should be able to publish");
1974        client
1975            .try_publish(
1976                std::borrow::Cow::Owned("hello/world".to_owned()),
1977                QoS::ExactlyOnce,
1978                false,
1979                "good bye",
1980            )
1981            .expect("Should be able to publish");
1982        let _ = rx.try_recv().expect("Should have message");
1983        let _ = rx.try_recv().expect("Should have message");
1984    }
1985
1986    #[test]
1987    fn publishing_invalid_cow_topic_fails() {
1988        let (tx, _) = flume::bounded(1);
1989        let client = Client::from_sender(tx);
1990        let err = client
1991            .publish(
1992                std::borrow::Cow::Borrowed("a/+/b"),
1993                QoS::ExactlyOnce,
1994                false,
1995                "good bye",
1996            )
1997            .expect_err("Invalid publish topic should fail");
1998        assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
1999    }
2000
2001    #[test]
2002    fn validated_topic_ergonomics() {
2003        let valid_topic = ValidatedTopic::new("hello/world").unwrap();
2004        let valid_topic_can_be_cloned = valid_topic.clone();
2005        assert_eq!(valid_topic, valid_topic_can_be_cloned);
2006    }
2007
2008    #[test]
2009    fn creating_invalid_validated_topic_fails() {
2010        assert_eq!(
2011            ValidatedTopic::new("a/+/b"),
2012            Err(InvalidTopic("a/+/b".to_string()))
2013        );
2014        assert_eq!(ValidatedTopic::new(""), Err(InvalidTopic(String::new())));
2015    }
2016
2017    #[test]
2018    fn publishing_invalid_raw_topic_fails() {
2019        let (tx, _) = flume::bounded(1);
2020        let client = Client::from_sender(tx);
2021        let err = client
2022            .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2023            .expect_err("Invalid publish topic should fail");
2024        assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2025
2026        let err = client
2027            .publish("", QoS::ExactlyOnce, false, "good bye")
2028            .expect_err("Empty publish topic should fail");
2029        assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2030    }
2031
2032    #[test]
2033    fn async_publish_paths_accept_validated_topic() {
2034        let (tx, rx) = flume::bounded(2);
2035        let client = AsyncClient::from_senders(tx);
2036        let runtime = runtime::Builder::new_current_thread()
2037            .enable_all()
2038            .build()
2039            .unwrap();
2040
2041        runtime.block_on(async {
2042            client
2043                .publish(
2044                    ValidatedTopic::new("hello/world").unwrap(),
2045                    QoS::ExactlyOnce,
2046                    false,
2047                    "good bye",
2048                )
2049                .await
2050                .expect("Should be able to publish");
2051
2052            client
2053                .publish_bytes(
2054                    ValidatedTopic::new("hello/world").unwrap(),
2055                    QoS::ExactlyOnce,
2056                    false,
2057                    Bytes::from_static(b"good bye"),
2058                )
2059                .await
2060                .expect("Should be able to publish");
2061        });
2062
2063        let _ = rx.try_recv().expect("Should have message");
2064        let _ = rx.try_recv().expect("Should have message");
2065    }
2066
2067    #[test]
2068    fn async_publishing_invalid_raw_topic_fails() {
2069        let (tx, _) = flume::bounded(2);
2070        let client = AsyncClient::from_senders(tx);
2071        let runtime = runtime::Builder::new_current_thread()
2072            .enable_all()
2073            .build()
2074            .unwrap();
2075
2076        runtime.block_on(async {
2077            let err = client
2078                .publish("a/+/b", QoS::ExactlyOnce, false, "good bye")
2079                .await
2080                .expect_err("Invalid publish topic should fail");
2081            assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2082
2083            let err = client
2084                .publish_bytes(
2085                    "a/+/b",
2086                    QoS::ExactlyOnce,
2087                    false,
2088                    Bytes::from_static(b"good bye"),
2089                )
2090                .await
2091                .expect_err("Invalid publish topic should fail");
2092            assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2093
2094            let err = client
2095                .publish("", QoS::ExactlyOnce, false, "good bye")
2096                .await
2097                .expect_err("Empty publish topic should fail");
2098            assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2099
2100            let err = client
2101                .publish_bytes("", QoS::ExactlyOnce, false, Bytes::from_static(b"good bye"))
2102                .await
2103                .expect_err("Empty publish topic should fail");
2104            assert!(matches!(err, ClientError::Request(req) if matches!(req, Request::Publish(_))));
2105        });
2106    }
2107
2108    #[test]
2109    fn tracked_publish_requires_tracking_channel() {
2110        let (tx, _) = flume::bounded(2);
2111        let client = AsyncClient::from_senders(tx);
2112        let runtime = runtime::Builder::new_current_thread()
2113            .enable_all()
2114            .build()
2115            .unwrap();
2116
2117        runtime.block_on(async {
2118            let err = client
2119                .publish_tracked("hello/world", QoS::AtLeastOnce, false, "good bye")
2120                .await
2121                .expect_err("tracked publish should fail without tracked channel");
2122            assert!(matches!(err, ClientError::TrackingUnavailable));
2123
2124            let err = client
2125                .publish_bytes_tracked(
2126                    "hello/world",
2127                    QoS::AtLeastOnce,
2128                    false,
2129                    Bytes::from_static(b"good bye"),
2130                )
2131                .await
2132                .expect_err("tracked publish bytes should fail without tracked channel");
2133            assert!(matches!(err, ClientError::TrackingUnavailable));
2134
2135            let err = client
2136                .subscribe_tracked("hello/world", QoS::AtLeastOnce)
2137                .await
2138                .expect_err("tracked subscribe should fail without tracked channel");
2139            assert!(matches!(err, ClientError::TrackingUnavailable));
2140
2141            let err = client
2142                .subscribe_many_tracked(vec![SubscribeFilter::new(
2143                    "hello/world".to_string(),
2144                    QoS::AtLeastOnce,
2145                )])
2146                .await
2147                .expect_err("tracked subscribe many should fail without tracked channel");
2148            assert!(matches!(err, ClientError::TrackingUnavailable));
2149
2150            let err = client
2151                .unsubscribe_tracked("hello/world")
2152                .await
2153                .expect_err("tracked unsubscribe should fail without tracked channel");
2154            assert!(matches!(err, ClientError::TrackingUnavailable));
2155        });
2156
2157        let err = client
2158            .try_subscribe_tracked("hello/world", QoS::AtLeastOnce)
2159            .expect_err("tracked try_subscribe should fail without tracked channel");
2160        assert!(matches!(err, ClientError::TrackingUnavailable));
2161
2162        let err = client
2163            .try_subscribe_many_tracked(vec![SubscribeFilter::new(
2164                "hello/world".to_string(),
2165                QoS::AtLeastOnce,
2166            )])
2167            .expect_err("tracked try_subscribe_many should fail without tracked channel");
2168        assert!(matches!(err, ClientError::TrackingUnavailable));
2169
2170        let err = client
2171            .try_unsubscribe_tracked("hello/world")
2172            .expect_err("tracked try_unsubscribe should fail without tracked channel");
2173        assert!(matches!(err, ClientError::TrackingUnavailable));
2174    }
2175
2176    #[test]
2177    fn tracked_unsubscribe_uses_control_request_channel() {
2178        let (requests, requests_rx) = flume::bounded(1);
2179        let (control_requests, control_requests_rx) = flume::bounded(1);
2180        let (immediate_disconnect, _immediate_disconnect_rx) = flume::unbounded();
2181        let client = AsyncClient {
2182            request_tx: RequestSender::WithNotice {
2183                requests,
2184                control_requests,
2185                immediate_disconnect,
2186            },
2187        };
2188        let runtime = runtime::Builder::new_current_thread()
2189            .enable_all()
2190            .build()
2191            .unwrap();
2192
2193        runtime
2194            .block_on(client.unsubscribe_tracked("hello/world"))
2195            .expect("tracked unsubscribe should enqueue");
2196
2197        assert!(requests_rx.is_empty());
2198        let envelope = control_requests_rx
2199            .try_recv()
2200            .expect("tracked unsubscribe should use control channel");
2201        assert!(matches!(envelope.into_parts().0, Request::Unsubscribe(_)));
2202    }
2203}