rumqttc_dev_patched/v5/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use std::time::Duration;
4
5use super::mqttbytes::v5::{
6    Filter, PubAck, PubRec, Publish, PublishProperties, SubAck, Subscribe, SubscribeProperties,
7    UnsubAck, Unsubscribe, UnsubscribeProperties,
8};
9use super::mqttbytes::QoS;
10use super::{AckOfAck, AckOfPub, ConnectionError, Event, EventLoop, MqttOptions, Request};
11use crate::tokens::{NoResponse, Resolver, Token};
12use crate::{valid_filter, valid_topic};
13
14use bytes::Bytes;
15use flume::{SendError, Sender, TrySendError};
16use futures_util::FutureExt;
17use tokio::runtime::{self, Runtime};
18use tokio::time::timeout;
19
20/// Client Error
21#[derive(Debug, thiserror::Error)]
22pub enum ClientError {
23    #[error("Failed to send mqtt requests to eventloop")]
24    Request(Request),
25    #[error("Failed to send mqtt requests to eventloop")]
26    TryRequest(Request),
27}
28
29impl From<SendError<Request>> for ClientError {
30    fn from(e: SendError<Request>) -> Self {
31        Self::Request(e.into_inner())
32    }
33}
34
35impl From<TrySendError<Request>> for ClientError {
36    fn from(e: TrySendError<Request>) -> Self {
37        Self::TryRequest(e.into_inner())
38    }
39}
40
41// An asynchronous client, communicates with MQTT `EventLoop`.
42///
43/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
44/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
45///
46/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
47/// from the broker, i.e. move ahead.
48#[derive(Clone, Debug)]
49pub struct AsyncClient {
50    request_tx: Sender<Request>,
51}
52
53impl AsyncClient {
54    /// Create a new `AsyncClient`.
55    ///
56    /// `cap` specifies the capacity of the bounded async channel.
57    pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
58        let eventloop = EventLoop::new(options, cap);
59        let request_tx = eventloop.requests_tx.clone();
60
61        let client = AsyncClient { request_tx };
62
63        (client, eventloop)
64    }
65
66    /// Create a new `AsyncClient` from a channel `Sender`.
67    ///
68    /// This is mostly useful for creating a test instance where you can
69    /// listen on the corresponding receiver.
70    pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
71        AsyncClient { request_tx }
72    }
73
74    /// Sends a MQTT Publish to the `EventLoop`.
75    async fn handle_publish<S, P>(
76        &self,
77        topic: S,
78        qos: QoS,
79        retain: bool,
80        payload: P,
81        properties: Option<PublishProperties>,
82    ) -> Result<Token<AckOfPub>, ClientError>
83    where
84        S: Into<String>,
85        P: Into<Bytes>,
86    {
87        let (resolver, token) = Resolver::new();
88        let topic = topic.into();
89        let mut publish = Publish::new(&topic, qos, payload, properties);
90        publish.retain = retain;
91        let publish = Request::Publish(publish, resolver);
92        if !valid_topic(&topic) {
93            return Err(ClientError::Request(publish));
94        }
95        self.request_tx.send_async(publish).await?;
96
97        Ok(token)
98    }
99
100    pub async fn publish_with_properties<S, P>(
101        &self,
102        topic: S,
103        qos: QoS,
104        retain: bool,
105        payload: P,
106        properties: PublishProperties,
107    ) -> Result<Token<AckOfPub>, ClientError>
108    where
109        S: Into<String>,
110        P: Into<Bytes>,
111    {
112        self.handle_publish(topic, qos, retain, payload, Some(properties))
113            .await
114    }
115
116    pub async fn publish<S, P>(
117        &self,
118        topic: S,
119        qos: QoS,
120        retain: bool,
121        payload: P,
122    ) -> Result<Token<AckOfPub>, ClientError>
123    where
124        S: Into<String>,
125        P: Into<Bytes>,
126    {
127        self.handle_publish(topic, qos, retain, payload, None).await
128    }
129
130    /// Attempts to send a MQTT Publish to the `EventLoop`.
131    fn handle_try_publish<S, P>(
132        &self,
133        topic: S,
134        qos: QoS,
135        retain: bool,
136        payload: P,
137        properties: Option<PublishProperties>,
138    ) -> Result<Token<AckOfPub>, ClientError>
139    where
140        S: Into<String>,
141        P: Into<Bytes>,
142    {
143        let (resolver, token) = Resolver::new();
144        let topic = topic.into();
145        let mut publish = Publish::new(&topic, qos, payload, properties);
146        publish.retain = retain;
147        let publish = Request::Publish(publish, resolver);
148        if !valid_topic(&topic) {
149            return Err(ClientError::TryRequest(publish));
150        }
151        self.request_tx.try_send(publish)?;
152
153        Ok(token)
154    }
155
156    pub fn try_publish_with_properties<S, P>(
157        &self,
158        topic: S,
159        qos: QoS,
160        retain: bool,
161        payload: P,
162        properties: PublishProperties,
163    ) -> Result<Token<AckOfPub>, ClientError>
164    where
165        S: Into<String>,
166        P: Into<Bytes>,
167    {
168        self.handle_try_publish(topic, qos, retain, payload, Some(properties))
169    }
170
171    pub fn try_publish<S, P>(
172        &self,
173        topic: S,
174        qos: QoS,
175        retain: bool,
176        payload: P,
177    ) -> Result<Token<AckOfPub>, ClientError>
178    where
179        S: Into<String>,
180        P: Into<Bytes>,
181    {
182        self.handle_try_publish(topic, qos, retain, payload, None)
183    }
184
185    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
186    pub async fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
187        let (resolver, token) = Resolver::new();
188        let ack = get_ack_req(publish, resolver);
189
190        if let Some(ack) = ack {
191            self.request_tx.send_async(ack).await?;
192        }
193
194        Ok(token)
195    }
196
197    /// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
198    pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
199        let (resolver, token) = Resolver::new();
200        let ack = get_ack_req(publish, resolver);
201        if let Some(ack) = ack {
202            self.request_tx.try_send(ack)?;
203        }
204
205        Ok(token)
206    }
207
208    /// Sends a MQTT Publish to the `EventLoop`
209    async fn handle_publish_bytes<S>(
210        &self,
211        topic: S,
212        qos: QoS,
213        retain: bool,
214        payload: Bytes,
215        properties: Option<PublishProperties>,
216    ) -> Result<Token<AckOfPub>, ClientError>
217    where
218        S: Into<String>,
219    {
220        let (resolver, token) = Resolver::new();
221        let topic = topic.into();
222        let mut publish = Publish::new(&topic, qos, payload, properties);
223        publish.retain = retain;
224        let publish = Request::Publish(publish, resolver);
225        self.request_tx.send_async(publish).await?;
226
227        Ok(token)
228    }
229
230    pub async fn publish_bytes_with_properties<S>(
231        &self,
232        topic: S,
233        qos: QoS,
234        retain: bool,
235        payload: Bytes,
236        properties: PublishProperties,
237    ) -> Result<Token<AckOfPub>, ClientError>
238    where
239        S: Into<String>,
240    {
241        self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
242            .await
243    }
244
245    pub async fn publish_bytes<S>(
246        &self,
247        topic: S,
248        qos: QoS,
249        retain: bool,
250        payload: Bytes,
251    ) -> Result<Token<AckOfPub>, ClientError>
252    where
253        S: Into<String>,
254    {
255        self.handle_publish_bytes(topic, qos, retain, payload, None)
256            .await
257    }
258
259    /// Sends a MQTT Subscribe to the `EventLoop`
260    async fn handle_subscribe<S: Into<String>>(
261        &self,
262        topic: S,
263        qos: QoS,
264        properties: Option<SubscribeProperties>,
265    ) -> Result<Token<SubAck>, ClientError> {
266        let (resolver, token) = Resolver::new();
267        let filter = Filter::new(topic, qos);
268        let subscribe = Subscribe::new(filter, properties);
269        let is_valid = subscribe_has_valid_filters(&subscribe);
270        let request = Request::Subscribe(subscribe, resolver);
271        if !is_valid {
272            return Err(ClientError::Request(request));
273        }
274        self.request_tx.send_async(request).await?;
275
276        Ok(token)
277    }
278
279    pub async fn subscribe_with_properties<S: Into<String>>(
280        &self,
281        topic: S,
282        qos: QoS,
283        properties: SubscribeProperties,
284    ) -> Result<Token<SubAck>, ClientError> {
285        self.handle_subscribe(topic, qos, Some(properties)).await
286    }
287
288    pub async fn subscribe<S: Into<String>>(
289        &self,
290        topic: S,
291        qos: QoS,
292    ) -> Result<Token<SubAck>, ClientError> {
293        self.handle_subscribe(topic, qos, None).await
294    }
295
296    /// Attempts to send a MQTT Subscribe to the `EventLoop`
297    fn handle_try_subscribe<S: Into<String>>(
298        &self,
299        topic: S,
300        qos: QoS,
301        properties: Option<SubscribeProperties>,
302    ) -> Result<Token<SubAck>, ClientError> {
303        let (resolver, token) = Resolver::new();
304        let filter = Filter::new(topic, qos);
305        let subscribe = Subscribe::new(filter, properties);
306        let is_valid = subscribe_has_valid_filters(&subscribe);
307        let request = Request::Subscribe(subscribe, resolver);
308        if !is_valid {
309            return Err(ClientError::TryRequest(request));
310        }
311        self.request_tx.try_send(request)?;
312
313        Ok(token)
314    }
315
316    pub fn try_subscribe_with_properties<S: Into<String>>(
317        &self,
318        topic: S,
319        qos: QoS,
320        properties: SubscribeProperties,
321    ) -> Result<Token<SubAck>, ClientError> {
322        self.handle_try_subscribe(topic, qos, Some(properties))
323    }
324
325    pub fn try_subscribe<S: Into<String>>(
326        &self,
327        topic: S,
328        qos: QoS,
329    ) -> Result<Token<SubAck>, ClientError> {
330        self.handle_try_subscribe(topic, qos, None)
331    }
332
333    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
334    async fn handle_subscribe_many<T>(
335        &self,
336        topics: T,
337        properties: Option<SubscribeProperties>,
338    ) -> Result<Token<SubAck>, ClientError>
339    where
340        T: IntoIterator<Item = Filter>,
341    {
342        let (resolver, token) = Resolver::new();
343        let subscribe = Subscribe::new_many(topics, properties);
344        let is_valid = subscribe_has_valid_filters(&subscribe);
345        let request = Request::Subscribe(subscribe, resolver);
346        if !is_valid {
347            return Err(ClientError::Request(request));
348        }
349        self.request_tx.send_async(request).await?;
350
351        Ok(token)
352    }
353
354    pub async fn subscribe_many_with_properties<T>(
355        &self,
356        topics: T,
357        properties: SubscribeProperties,
358    ) -> Result<Token<SubAck>, ClientError>
359    where
360        T: IntoIterator<Item = Filter>,
361    {
362        self.handle_subscribe_many(topics, Some(properties)).await
363    }
364
365    pub async fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
366    where
367        T: IntoIterator<Item = Filter>,
368    {
369        self.handle_subscribe_many(topics, None).await
370    }
371
372    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
373    fn handle_try_subscribe_many<T>(
374        &self,
375        topics: T,
376        properties: Option<SubscribeProperties>,
377    ) -> Result<Token<SubAck>, ClientError>
378    where
379        T: IntoIterator<Item = Filter>,
380    {
381        let (resolver, token) = Resolver::new();
382        let subscribe = Subscribe::new_many(topics, properties);
383        let is_valid = subscribe_has_valid_filters(&subscribe);
384        let request = Request::Subscribe(subscribe, resolver);
385        if !is_valid {
386            return Err(ClientError::TryRequest(request));
387        }
388        self.request_tx.try_send(request)?;
389
390        Ok(token)
391    }
392
393    pub fn try_subscribe_many_with_properties<T>(
394        &self,
395        topics: T,
396        properties: SubscribeProperties,
397    ) -> Result<Token<SubAck>, ClientError>
398    where
399        T: IntoIterator<Item = Filter>,
400    {
401        self.handle_try_subscribe_many(topics, Some(properties))
402    }
403
404    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
405    where
406        T: IntoIterator<Item = Filter>,
407    {
408        self.handle_try_subscribe_many(topics, None)
409    }
410
411    /// Sends a MQTT Unsubscribe to the `EventLoop`
412    async fn handle_unsubscribe<S: Into<String>>(
413        &self,
414        topic: S,
415        properties: Option<UnsubscribeProperties>,
416    ) -> Result<Token<UnsubAck>, ClientError> {
417        let (resolver, token) = Resolver::new();
418        let unsubscribe = Unsubscribe::new(topic, properties);
419        let request = Request::Unsubscribe(unsubscribe, resolver);
420        self.request_tx.send_async(request).await?;
421
422        Ok(token)
423    }
424
425    pub async fn unsubscribe_with_properties<S: Into<String>>(
426        &self,
427        topic: S,
428        properties: UnsubscribeProperties,
429    ) -> Result<Token<UnsubAck>, ClientError> {
430        self.handle_unsubscribe(topic, Some(properties)).await
431    }
432
433    pub async fn unsubscribe<S: Into<String>>(
434        &self,
435        topic: S,
436    ) -> Result<Token<UnsubAck>, ClientError> {
437        self.handle_unsubscribe(topic, None).await
438    }
439
440    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`
441    fn handle_try_unsubscribe<S: Into<String>>(
442        &self,
443        topic: S,
444        properties: Option<UnsubscribeProperties>,
445    ) -> Result<Token<UnsubAck>, ClientError> {
446        let (resolver, token) = Resolver::new();
447        let unsubscribe = Unsubscribe::new(topic, properties);
448        let request = Request::Unsubscribe(unsubscribe, resolver);
449        self.request_tx.try_send(request)?;
450
451        Ok(token)
452    }
453
454    pub fn try_unsubscribe_with_properties<S: Into<String>>(
455        &self,
456        topic: S,
457        properties: UnsubscribeProperties,
458    ) -> Result<Token<UnsubAck>, ClientError> {
459        self.handle_try_unsubscribe(topic, Some(properties))
460    }
461
462    pub fn try_unsubscribe<S: Into<String>>(
463        &self,
464        topic: S,
465    ) -> Result<Token<UnsubAck>, ClientError> {
466        self.handle_try_unsubscribe(topic, None)
467    }
468
469    /// Sends a MQTT disconnect to the `EventLoop`
470    pub async fn disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
471        let (resolver, token) = Resolver::new();
472        let request = Request::Disconnect(resolver);
473        self.request_tx.send_async(request).await?;
474
475        Ok(token)
476    }
477
478    /// Attempts to send a MQTT disconnect to the `EventLoop`
479    pub fn try_disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
480        let (resolver, token) = Resolver::new();
481        let request = Request::Disconnect(resolver);
482        self.request_tx.try_send(request)?;
483
484        Ok(token)
485    }
486}
487
488fn get_ack_req(publish: &Publish, resolver: Resolver<AckOfAck>) -> Option<Request> {
489    let ack = match publish.qos {
490        QoS::AtMostOnce => {
491            resolver.resolve(AckOfAck::None);
492            return None;
493        }
494        QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None), resolver),
495        QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid, None), resolver),
496    };
497    Some(ack)
498}
499
500/// A synchronous client, communicates with MQTT `EventLoop`.
501///
502/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
503/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
504/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
505///
506/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
507/// to send, receive and process packets from the broker, i.e. move ahead.
508///
509/// An asynchronous channel handle can also be extracted if necessary.
510#[derive(Clone)]
511pub struct Client {
512    client: AsyncClient,
513}
514
515impl Client {
516    /// Create a new `Client`
517    ///
518    /// `cap` specifies the capacity of the bounded async channel.
519    pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
520        let (client, eventloop) = AsyncClient::new(options, cap);
521        let client = Client { client };
522
523        let runtime = runtime::Builder::new_current_thread()
524            .enable_all()
525            .build()
526            .unwrap();
527
528        let connection = Connection::new(eventloop, runtime);
529        (client, connection)
530    }
531
532    /// Create a new `Client` from a channel `Sender`.
533    ///
534    /// This is mostly useful for creating a test instance where you can
535    /// listen on the corresponding receiver.
536    pub fn from_sender(request_tx: Sender<Request>) -> Client {
537        Client {
538            client: AsyncClient::from_senders(request_tx),
539        }
540    }
541
542    /// Sends a MQTT Publish to the `EventLoop`
543    fn handle_publish<S, P>(
544        &self,
545        topic: S,
546        qos: QoS,
547        retain: bool,
548        payload: P,
549        properties: Option<PublishProperties>,
550    ) -> Result<Token<AckOfPub>, ClientError>
551    where
552        S: Into<String>,
553        P: Into<Bytes>,
554    {
555        let (resolver, token) = Resolver::new();
556        let topic = topic.into();
557        let mut publish = Publish::new(&topic, qos, payload, properties);
558        publish.retain = retain;
559        let publish = Request::Publish(publish, resolver);
560        if !valid_topic(&topic) {
561            return Err(ClientError::Request(publish));
562        }
563        self.client.request_tx.send(publish)?;
564
565        Ok(token)
566    }
567
568    pub fn publish_with_properties<S, P>(
569        &self,
570        topic: S,
571        qos: QoS,
572        retain: bool,
573        payload: P,
574        properties: PublishProperties,
575    ) -> Result<Token<AckOfPub>, ClientError>
576    where
577        S: Into<String>,
578        P: Into<Bytes>,
579    {
580        self.handle_publish(topic, qos, retain, payload, Some(properties))
581    }
582
583    pub fn publish<S, P>(
584        &self,
585        topic: S,
586        qos: QoS,
587        retain: bool,
588        payload: P,
589    ) -> Result<Token<AckOfPub>, ClientError>
590    where
591        S: Into<String>,
592        P: Into<Bytes>,
593    {
594        self.handle_publish(topic, qos, retain, payload, None)
595    }
596
597    pub fn try_publish_with_properties<S, P>(
598        &self,
599        topic: S,
600        qos: QoS,
601        retain: bool,
602        payload: P,
603        properties: PublishProperties,
604    ) -> Result<Token<AckOfPub>, ClientError>
605    where
606        S: Into<String>,
607        P: Into<Bytes>,
608    {
609        self.client
610            .try_publish_with_properties(topic, qos, retain, payload, properties)
611    }
612
613    pub fn try_publish<S, P>(
614        &self,
615        topic: S,
616        qos: QoS,
617        retain: bool,
618        payload: P,
619    ) -> Result<Token<AckOfPub>, ClientError>
620    where
621        S: Into<String>,
622        P: Into<Bytes>,
623    {
624        self.client.try_publish(topic, qos, retain, payload)
625    }
626
627    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
628    pub fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
629        let (resolver, token) = Resolver::new();
630        let ack = get_ack_req(publish, resolver);
631
632        if let Some(ack) = ack {
633            self.client.request_tx.send(ack)?;
634        }
635
636        Ok(token)
637    }
638
639    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
640    pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
641        self.client.try_ack(publish)
642    }
643
644    /// Sends a MQTT Subscribe to the `EventLoop`
645    fn handle_subscribe<S: Into<String>>(
646        &self,
647        topic: S,
648        qos: QoS,
649        properties: Option<SubscribeProperties>,
650    ) -> Result<Token<SubAck>, ClientError> {
651        let (resolver, token) = Resolver::new();
652        let filter = Filter::new(topic, qos);
653        let subscribe = Subscribe::new(filter, properties);
654        let is_valid = subscribe_has_valid_filters(&subscribe);
655        let request = Request::Subscribe(subscribe, resolver);
656        if !is_valid {
657            return Err(ClientError::Request(request));
658        }
659        self.client.request_tx.send(request)?;
660
661        Ok(token)
662    }
663
664    pub fn subscribe_with_properties<S: Into<String>>(
665        &self,
666        topic: S,
667        qos: QoS,
668        properties: SubscribeProperties,
669    ) -> Result<Token<SubAck>, ClientError> {
670        self.handle_subscribe(topic, qos, Some(properties))
671    }
672
673    pub fn subscribe<S: Into<String>>(
674        &self,
675        topic: S,
676        qos: QoS,
677    ) -> Result<Token<SubAck>, ClientError> {
678        self.handle_subscribe(topic, qos, None)
679    }
680
681    /// Sends a MQTT Subscribe to the `EventLoop`
682    pub fn try_subscribe_with_properties<S: Into<String>>(
683        &self,
684        topic: S,
685        qos: QoS,
686        properties: SubscribeProperties,
687    ) -> Result<Token<SubAck>, ClientError> {
688        self.client
689            .try_subscribe_with_properties(topic, qos, properties)
690    }
691
692    pub fn try_subscribe<S: Into<String>>(
693        &self,
694        topic: S,
695        qos: QoS,
696    ) -> Result<Token<SubAck>, ClientError> {
697        self.client.try_subscribe(topic, qos)
698    }
699
700    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
701    fn handle_subscribe_many<T>(
702        &self,
703        topics: T,
704        properties: Option<SubscribeProperties>,
705    ) -> Result<Token<SubAck>, ClientError>
706    where
707        T: IntoIterator<Item = Filter>,
708    {
709        let (resolver, token) = Resolver::new();
710        let subscribe = Subscribe::new_many(topics, properties);
711        let is_valid = subscribe_has_valid_filters(&subscribe);
712        let request = Request::Subscribe(subscribe, resolver);
713        if !is_valid {
714            return Err(ClientError::Request(request));
715        }
716        self.client.request_tx.send(request)?;
717
718        Ok(token)
719    }
720
721    pub fn subscribe_many_with_properties<T>(
722        &self,
723        topics: T,
724        properties: SubscribeProperties,
725    ) -> Result<Token<SubAck>, ClientError>
726    where
727        T: IntoIterator<Item = Filter>,
728    {
729        self.handle_subscribe_many(topics, Some(properties))
730    }
731
732    pub fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
733    where
734        T: IntoIterator<Item = Filter>,
735    {
736        self.handle_subscribe_many(topics, None)
737    }
738
739    pub fn try_subscribe_many_with_properties<T>(
740        &self,
741        topics: T,
742        properties: SubscribeProperties,
743    ) -> Result<Token<SubAck>, ClientError>
744    where
745        T: IntoIterator<Item = Filter>,
746    {
747        self.client
748            .try_subscribe_many_with_properties(topics, properties)
749    }
750
751    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
752    where
753        T: IntoIterator<Item = Filter>,
754    {
755        self.client.try_subscribe_many(topics)
756    }
757
758    /// Sends a MQTT Unsubscribe to the `EventLoop`
759    fn handle_unsubscribe<S: Into<String>>(
760        &self,
761        topic: S,
762        properties: Option<UnsubscribeProperties>,
763    ) -> Result<Token<UnsubAck>, ClientError> {
764        let (resolver, token) = Resolver::new();
765        let unsubscribe = Unsubscribe::new(topic, properties);
766        let request = Request::Unsubscribe(unsubscribe, resolver);
767        self.client.request_tx.send(request)?;
768
769        Ok(token)
770    }
771
772    pub fn unsubscribe_with_properties<S: Into<String>>(
773        &self,
774        topic: S,
775        properties: UnsubscribeProperties,
776    ) -> Result<Token<UnsubAck>, ClientError> {
777        self.handle_unsubscribe(topic, Some(properties))
778    }
779
780    pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<UnsubAck>, ClientError> {
781        self.handle_unsubscribe(topic, None)
782    }
783
784    /// Sends a MQTT Unsubscribe to the `EventLoop`
785    pub fn try_unsubscribe_with_properties<S: Into<String>>(
786        &self,
787        topic: S,
788        properties: UnsubscribeProperties,
789    ) -> Result<Token<UnsubAck>, ClientError> {
790        self.client
791            .try_unsubscribe_with_properties(topic, properties)
792    }
793
794    pub fn try_unsubscribe<S: Into<String>>(
795        &self,
796        topic: S,
797    ) -> Result<Token<UnsubAck>, ClientError> {
798        self.client.try_unsubscribe(topic)
799    }
800
801    /// Sends a MQTT disconnect to the `EventLoop`
802    pub fn disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
803        let (resolver, token) = Resolver::new();
804        let request = Request::Disconnect(resolver);
805        self.client.request_tx.send(request)?;
806
807        Ok(token)
808    }
809
810    /// Sends a MQTT disconnect to the `EventLoop`
811    pub fn try_disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
812        self.client.try_disconnect()
813    }
814}
815
816#[must_use]
817fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
818    !subscribe.filters.is_empty()
819        && subscribe
820            .filters
821            .iter()
822            .all(|filter| valid_filter(&filter.path))
823}
824
825/// Error type returned by [`Connection::recv`]
826#[derive(Debug, Eq, PartialEq)]
827pub struct RecvError;
828
829/// Error type returned by [`Connection::try_recv`]
830#[derive(Debug, Eq, PartialEq)]
831pub enum TryRecvError {
832    /// User has closed requests channel
833    Disconnected,
834    /// Did not resolve
835    Empty,
836}
837
838/// Error type returned by [`Connection::recv_timeout`]
839#[derive(Debug, Eq, PartialEq)]
840pub enum RecvTimeoutError {
841    /// User has closed requests channel
842    Disconnected,
843    /// Recv request timedout
844    Timeout,
845}
846
847///  MQTT connection. Maintains all the necessary state
848pub struct Connection {
849    pub eventloop: EventLoop,
850    runtime: Runtime,
851}
852impl Connection {
853    fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
854        Connection { eventloop, runtime }
855    }
856
857    /// Returns an iterator over this connection. Iterating over this is all that's
858    /// necessary to make connection progress and maintain a robust connection.
859    /// Just continuing to loop will reconnect
860    /// **NOTE** Don't block this while iterating
861    // ideally this should be named iter_mut because it requires a mutable reference
862    // Also we can implement IntoIter for this to make it easy to iterate over it
863    #[must_use = "Connection should be iterated over a loop to make progress"]
864    pub fn iter(&mut self) -> Iter<'_> {
865        Iter { connection: self }
866    }
867
868    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
869    /// if all clients/users have closed requests channel.
870    ///
871    /// [`EventLoop`]: super::EventLoop
872    pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
873        let f = self.eventloop.poll();
874        let event = self.runtime.block_on(f);
875
876        resolve_event(event).ok_or(RecvError)
877    }
878
879    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
880    /// if none immediately present or all clients/users have closed requests channel.
881    ///
882    /// [`EventLoop`]: super::EventLoop
883    pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
884        let f = self.eventloop.poll();
885        // Enters the runtime context so we can poll the future, as required by `now_or_never()`.
886        // ref: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.enter
887        let _guard = self.runtime.enter();
888        let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
889
890        resolve_event(event).ok_or(TryRecvError::Disconnected)
891    }
892
893    /// Attempt to fetch an incoming [`Event`] on the [`EventLoop`], returning an error
894    /// if all clients/users have closed requests channel or the timeout has expired.
895    ///
896    /// [`EventLoop`]: super::EventLoop
897    pub fn recv_timeout(
898        &mut self,
899        duration: Duration,
900    ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
901        let f = self.eventloop.poll();
902        let event = self
903            .runtime
904            .block_on(async { timeout(duration, f).await })
905            .map_err(|_| RecvTimeoutError::Timeout)?;
906
907        resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
908    }
909}
910
911fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
912    match event {
913        Ok(v) => Some(Ok(v)),
914        // closing of request channel should stop the iterator
915        Err(ConnectionError::RequestsDone) => {
916            trace!("Done with requests");
917            None
918        }
919        Err(e) => Some(Err(e)),
920    }
921}
922
923/// Iterator which polls the `EventLoop` for connection progress
924pub struct Iter<'a> {
925    connection: &'a mut Connection,
926}
927
928impl Iterator for Iter<'_> {
929    type Item = Result<Event, ConnectionError>;
930
931    fn next(&mut self) -> Option<Self::Item> {
932        self.connection.recv().ok()
933    }
934}
935
936#[cfg(test)]
937mod test {
938    use crate::v5::mqttbytes::v5::LastWill;
939
940    use super::*;
941
942    #[test]
943    fn calling_iter_twice_on_connection_shouldnt_panic() {
944        use std::time::Duration;
945
946        let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
947        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
948        mqttoptions
949            .set_keep_alive(Duration::from_secs(5))
950            .set_last_will(will);
951
952        let (_, mut connection) = Client::new(mqttoptions, 10);
953        let _ = connection.iter();
954        let _ = connection.iter();
955    }
956
957    #[test]
958    fn should_be_able_to_build_test_client_from_channel() {
959        let (tx, rx) = flume::bounded(1);
960        let client = Client::from_sender(tx);
961        client
962            .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
963            .expect("Should be able to publish");
964        let _ = rx.try_recv().expect("Should have message");
965    }
966}