1use std::time::Duration;
4
5use super::mqttbytes::v5::{
6 Filter, PubAck, PubRec, Publish, PublishProperties, SubAck, Subscribe, SubscribeProperties,
7 UnsubAck, Unsubscribe, UnsubscribeProperties,
8};
9use super::mqttbytes::QoS;
10use super::{AckOfAck, AckOfPub, ConnectionError, Event, EventLoop, MqttOptions, Request};
11use crate::tokens::{NoResponse, Resolver, Token};
12use crate::{valid_filter, valid_topic};
13
14use bytes::Bytes;
15use flume::{SendError, Sender, TrySendError};
16use futures_util::FutureExt;
17use tokio::runtime::{self, Runtime};
18use tokio::time::timeout;
19
20#[derive(Debug, thiserror::Error)]
22pub enum ClientError {
23 #[error("Failed to send mqtt requests to eventloop")]
24 Request(Request),
25 #[error("Failed to send mqtt requests to eventloop")]
26 TryRequest(Request),
27}
28
29impl From<SendError<Request>> for ClientError {
30 fn from(e: SendError<Request>) -> Self {
31 Self::Request(e.into_inner())
32 }
33}
34
35impl From<TrySendError<Request>> for ClientError {
36 fn from(e: TrySendError<Request>) -> Self {
37 Self::TryRequest(e.into_inner())
38 }
39}
40
41#[derive(Clone, Debug)]
49pub struct AsyncClient {
50 request_tx: Sender<Request>,
51}
52
53impl AsyncClient {
54 pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
58 let eventloop = EventLoop::new(options, cap);
59 let request_tx = eventloop.requests_tx.clone();
60
61 let client = AsyncClient { request_tx };
62
63 (client, eventloop)
64 }
65
66 pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
71 AsyncClient { request_tx }
72 }
73
74 async fn handle_publish<S, P>(
76 &self,
77 topic: S,
78 qos: QoS,
79 retain: bool,
80 payload: P,
81 properties: Option<PublishProperties>,
82 ) -> Result<Token<AckOfPub>, ClientError>
83 where
84 S: Into<String>,
85 P: Into<Bytes>,
86 {
87 let (resolver, token) = Resolver::new();
88 let topic = topic.into();
89 let mut publish = Publish::new(&topic, qos, payload, properties);
90 publish.retain = retain;
91 let publish = Request::Publish(publish, resolver);
92 if !valid_topic(&topic) {
93 return Err(ClientError::Request(publish));
94 }
95 self.request_tx.send_async(publish).await?;
96
97 Ok(token)
98 }
99
100 pub async fn publish_with_properties<S, P>(
101 &self,
102 topic: S,
103 qos: QoS,
104 retain: bool,
105 payload: P,
106 properties: PublishProperties,
107 ) -> Result<Token<AckOfPub>, ClientError>
108 where
109 S: Into<String>,
110 P: Into<Bytes>,
111 {
112 self.handle_publish(topic, qos, retain, payload, Some(properties))
113 .await
114 }
115
116 pub async fn publish<S, P>(
117 &self,
118 topic: S,
119 qos: QoS,
120 retain: bool,
121 payload: P,
122 ) -> Result<Token<AckOfPub>, ClientError>
123 where
124 S: Into<String>,
125 P: Into<Bytes>,
126 {
127 self.handle_publish(topic, qos, retain, payload, None).await
128 }
129
130 fn handle_try_publish<S, P>(
132 &self,
133 topic: S,
134 qos: QoS,
135 retain: bool,
136 payload: P,
137 properties: Option<PublishProperties>,
138 ) -> Result<Token<AckOfPub>, ClientError>
139 where
140 S: Into<String>,
141 P: Into<Bytes>,
142 {
143 let (resolver, token) = Resolver::new();
144 let topic = topic.into();
145 let mut publish = Publish::new(&topic, qos, payload, properties);
146 publish.retain = retain;
147 let publish = Request::Publish(publish, resolver);
148 if !valid_topic(&topic) {
149 return Err(ClientError::TryRequest(publish));
150 }
151 self.request_tx.try_send(publish)?;
152
153 Ok(token)
154 }
155
156 pub fn try_publish_with_properties<S, P>(
157 &self,
158 topic: S,
159 qos: QoS,
160 retain: bool,
161 payload: P,
162 properties: PublishProperties,
163 ) -> Result<Token<AckOfPub>, ClientError>
164 where
165 S: Into<String>,
166 P: Into<Bytes>,
167 {
168 self.handle_try_publish(topic, qos, retain, payload, Some(properties))
169 }
170
171 pub fn try_publish<S, P>(
172 &self,
173 topic: S,
174 qos: QoS,
175 retain: bool,
176 payload: P,
177 ) -> Result<Token<AckOfPub>, ClientError>
178 where
179 S: Into<String>,
180 P: Into<Bytes>,
181 {
182 self.handle_try_publish(topic, qos, retain, payload, None)
183 }
184
185 pub async fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
187 let (resolver, token) = Resolver::new();
188 let ack = get_ack_req(publish, resolver);
189
190 if let Some(ack) = ack {
191 self.request_tx.send_async(ack).await?;
192 }
193
194 Ok(token)
195 }
196
197 pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
199 let (resolver, token) = Resolver::new();
200 let ack = get_ack_req(publish, resolver);
201 if let Some(ack) = ack {
202 self.request_tx.try_send(ack)?;
203 }
204
205 Ok(token)
206 }
207
208 async fn handle_publish_bytes<S>(
210 &self,
211 topic: S,
212 qos: QoS,
213 retain: bool,
214 payload: Bytes,
215 properties: Option<PublishProperties>,
216 ) -> Result<Token<AckOfPub>, ClientError>
217 where
218 S: Into<String>,
219 {
220 let (resolver, token) = Resolver::new();
221 let topic = topic.into();
222 let mut publish = Publish::new(&topic, qos, payload, properties);
223 publish.retain = retain;
224 let publish = Request::Publish(publish, resolver);
225 self.request_tx.send_async(publish).await?;
226
227 Ok(token)
228 }
229
230 pub async fn publish_bytes_with_properties<S>(
231 &self,
232 topic: S,
233 qos: QoS,
234 retain: bool,
235 payload: Bytes,
236 properties: PublishProperties,
237 ) -> Result<Token<AckOfPub>, ClientError>
238 where
239 S: Into<String>,
240 {
241 self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
242 .await
243 }
244
245 pub async fn publish_bytes<S>(
246 &self,
247 topic: S,
248 qos: QoS,
249 retain: bool,
250 payload: Bytes,
251 ) -> Result<Token<AckOfPub>, ClientError>
252 where
253 S: Into<String>,
254 {
255 self.handle_publish_bytes(topic, qos, retain, payload, None)
256 .await
257 }
258
259 async fn handle_subscribe<S: Into<String>>(
261 &self,
262 topic: S,
263 qos: QoS,
264 properties: Option<SubscribeProperties>,
265 ) -> Result<Token<SubAck>, ClientError> {
266 let (resolver, token) = Resolver::new();
267 let filter = Filter::new(topic, qos);
268 let subscribe = Subscribe::new(filter, properties);
269 let is_valid = subscribe_has_valid_filters(&subscribe);
270 let request = Request::Subscribe(subscribe, resolver);
271 if !is_valid {
272 return Err(ClientError::Request(request));
273 }
274 self.request_tx.send_async(request).await?;
275
276 Ok(token)
277 }
278
279 pub async fn subscribe_with_properties<S: Into<String>>(
280 &self,
281 topic: S,
282 qos: QoS,
283 properties: SubscribeProperties,
284 ) -> Result<Token<SubAck>, ClientError> {
285 self.handle_subscribe(topic, qos, Some(properties)).await
286 }
287
288 pub async fn subscribe<S: Into<String>>(
289 &self,
290 topic: S,
291 qos: QoS,
292 ) -> Result<Token<SubAck>, ClientError> {
293 self.handle_subscribe(topic, qos, None).await
294 }
295
296 fn handle_try_subscribe<S: Into<String>>(
298 &self,
299 topic: S,
300 qos: QoS,
301 properties: Option<SubscribeProperties>,
302 ) -> Result<Token<SubAck>, ClientError> {
303 let (resolver, token) = Resolver::new();
304 let filter = Filter::new(topic, qos);
305 let subscribe = Subscribe::new(filter, properties);
306 let is_valid = subscribe_has_valid_filters(&subscribe);
307 let request = Request::Subscribe(subscribe, resolver);
308 if !is_valid {
309 return Err(ClientError::TryRequest(request));
310 }
311 self.request_tx.try_send(request)?;
312
313 Ok(token)
314 }
315
316 pub fn try_subscribe_with_properties<S: Into<String>>(
317 &self,
318 topic: S,
319 qos: QoS,
320 properties: SubscribeProperties,
321 ) -> Result<Token<SubAck>, ClientError> {
322 self.handle_try_subscribe(topic, qos, Some(properties))
323 }
324
325 pub fn try_subscribe<S: Into<String>>(
326 &self,
327 topic: S,
328 qos: QoS,
329 ) -> Result<Token<SubAck>, ClientError> {
330 self.handle_try_subscribe(topic, qos, None)
331 }
332
333 async fn handle_subscribe_many<T>(
335 &self,
336 topics: T,
337 properties: Option<SubscribeProperties>,
338 ) -> Result<Token<SubAck>, ClientError>
339 where
340 T: IntoIterator<Item = Filter>,
341 {
342 let (resolver, token) = Resolver::new();
343 let subscribe = Subscribe::new_many(topics, properties);
344 let is_valid = subscribe_has_valid_filters(&subscribe);
345 let request = Request::Subscribe(subscribe, resolver);
346 if !is_valid {
347 return Err(ClientError::Request(request));
348 }
349 self.request_tx.send_async(request).await?;
350
351 Ok(token)
352 }
353
354 pub async fn subscribe_many_with_properties<T>(
355 &self,
356 topics: T,
357 properties: SubscribeProperties,
358 ) -> Result<Token<SubAck>, ClientError>
359 where
360 T: IntoIterator<Item = Filter>,
361 {
362 self.handle_subscribe_many(topics, Some(properties)).await
363 }
364
365 pub async fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
366 where
367 T: IntoIterator<Item = Filter>,
368 {
369 self.handle_subscribe_many(topics, None).await
370 }
371
372 fn handle_try_subscribe_many<T>(
374 &self,
375 topics: T,
376 properties: Option<SubscribeProperties>,
377 ) -> Result<Token<SubAck>, ClientError>
378 where
379 T: IntoIterator<Item = Filter>,
380 {
381 let (resolver, token) = Resolver::new();
382 let subscribe = Subscribe::new_many(topics, properties);
383 let is_valid = subscribe_has_valid_filters(&subscribe);
384 let request = Request::Subscribe(subscribe, resolver);
385 if !is_valid {
386 return Err(ClientError::TryRequest(request));
387 }
388 self.request_tx.try_send(request)?;
389
390 Ok(token)
391 }
392
393 pub fn try_subscribe_many_with_properties<T>(
394 &self,
395 topics: T,
396 properties: SubscribeProperties,
397 ) -> Result<Token<SubAck>, ClientError>
398 where
399 T: IntoIterator<Item = Filter>,
400 {
401 self.handle_try_subscribe_many(topics, Some(properties))
402 }
403
404 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
405 where
406 T: IntoIterator<Item = Filter>,
407 {
408 self.handle_try_subscribe_many(topics, None)
409 }
410
411 async fn handle_unsubscribe<S: Into<String>>(
413 &self,
414 topic: S,
415 properties: Option<UnsubscribeProperties>,
416 ) -> Result<Token<UnsubAck>, ClientError> {
417 let (resolver, token) = Resolver::new();
418 let unsubscribe = Unsubscribe::new(topic, properties);
419 let request = Request::Unsubscribe(unsubscribe, resolver);
420 self.request_tx.send_async(request).await?;
421
422 Ok(token)
423 }
424
425 pub async fn unsubscribe_with_properties<S: Into<String>>(
426 &self,
427 topic: S,
428 properties: UnsubscribeProperties,
429 ) -> Result<Token<UnsubAck>, ClientError> {
430 self.handle_unsubscribe(topic, Some(properties)).await
431 }
432
433 pub async fn unsubscribe<S: Into<String>>(
434 &self,
435 topic: S,
436 ) -> Result<Token<UnsubAck>, ClientError> {
437 self.handle_unsubscribe(topic, None).await
438 }
439
440 fn handle_try_unsubscribe<S: Into<String>>(
442 &self,
443 topic: S,
444 properties: Option<UnsubscribeProperties>,
445 ) -> Result<Token<UnsubAck>, ClientError> {
446 let (resolver, token) = Resolver::new();
447 let unsubscribe = Unsubscribe::new(topic, properties);
448 let request = Request::Unsubscribe(unsubscribe, resolver);
449 self.request_tx.try_send(request)?;
450
451 Ok(token)
452 }
453
454 pub fn try_unsubscribe_with_properties<S: Into<String>>(
455 &self,
456 topic: S,
457 properties: UnsubscribeProperties,
458 ) -> Result<Token<UnsubAck>, ClientError> {
459 self.handle_try_unsubscribe(topic, Some(properties))
460 }
461
462 pub fn try_unsubscribe<S: Into<String>>(
463 &self,
464 topic: S,
465 ) -> Result<Token<UnsubAck>, ClientError> {
466 self.handle_try_unsubscribe(topic, None)
467 }
468
469 pub async fn disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
471 let (resolver, token) = Resolver::new();
472 let request = Request::Disconnect(resolver);
473 self.request_tx.send_async(request).await?;
474
475 Ok(token)
476 }
477
478 pub fn try_disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
480 let (resolver, token) = Resolver::new();
481 let request = Request::Disconnect(resolver);
482 self.request_tx.try_send(request)?;
483
484 Ok(token)
485 }
486}
487
488fn get_ack_req(publish: &Publish, resolver: Resolver<AckOfAck>) -> Option<Request> {
489 let ack = match publish.qos {
490 QoS::AtMostOnce => {
491 resolver.resolve(AckOfAck::None);
492 return None;
493 }
494 QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None), resolver),
495 QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid, None), resolver),
496 };
497 Some(ack)
498}
499
500#[derive(Clone)]
511pub struct Client {
512 client: AsyncClient,
513}
514
515impl Client {
516 pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
520 let (client, eventloop) = AsyncClient::new(options, cap);
521 let client = Client { client };
522
523 let runtime = runtime::Builder::new_current_thread()
524 .enable_all()
525 .build()
526 .unwrap();
527
528 let connection = Connection::new(eventloop, runtime);
529 (client, connection)
530 }
531
532 pub fn from_sender(request_tx: Sender<Request>) -> Client {
537 Client {
538 client: AsyncClient::from_senders(request_tx),
539 }
540 }
541
542 fn handle_publish<S, P>(
544 &self,
545 topic: S,
546 qos: QoS,
547 retain: bool,
548 payload: P,
549 properties: Option<PublishProperties>,
550 ) -> Result<Token<AckOfPub>, ClientError>
551 where
552 S: Into<String>,
553 P: Into<Bytes>,
554 {
555 let (resolver, token) = Resolver::new();
556 let topic = topic.into();
557 let mut publish = Publish::new(&topic, qos, payload, properties);
558 publish.retain = retain;
559 let publish = Request::Publish(publish, resolver);
560 if !valid_topic(&topic) {
561 return Err(ClientError::Request(publish));
562 }
563 self.client.request_tx.send(publish)?;
564
565 Ok(token)
566 }
567
568 pub fn publish_with_properties<S, P>(
569 &self,
570 topic: S,
571 qos: QoS,
572 retain: bool,
573 payload: P,
574 properties: PublishProperties,
575 ) -> Result<Token<AckOfPub>, ClientError>
576 where
577 S: Into<String>,
578 P: Into<Bytes>,
579 {
580 self.handle_publish(topic, qos, retain, payload, Some(properties))
581 }
582
583 pub fn publish<S, P>(
584 &self,
585 topic: S,
586 qos: QoS,
587 retain: bool,
588 payload: P,
589 ) -> Result<Token<AckOfPub>, ClientError>
590 where
591 S: Into<String>,
592 P: Into<Bytes>,
593 {
594 self.handle_publish(topic, qos, retain, payload, None)
595 }
596
597 pub fn try_publish_with_properties<S, P>(
598 &self,
599 topic: S,
600 qos: QoS,
601 retain: bool,
602 payload: P,
603 properties: PublishProperties,
604 ) -> Result<Token<AckOfPub>, ClientError>
605 where
606 S: Into<String>,
607 P: Into<Bytes>,
608 {
609 self.client
610 .try_publish_with_properties(topic, qos, retain, payload, properties)
611 }
612
613 pub fn try_publish<S, P>(
614 &self,
615 topic: S,
616 qos: QoS,
617 retain: bool,
618 payload: P,
619 ) -> Result<Token<AckOfPub>, ClientError>
620 where
621 S: Into<String>,
622 P: Into<Bytes>,
623 {
624 self.client.try_publish(topic, qos, retain, payload)
625 }
626
627 pub fn ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
629 let (resolver, token) = Resolver::new();
630 let ack = get_ack_req(publish, resolver);
631
632 if let Some(ack) = ack {
633 self.client.request_tx.send(ack)?;
634 }
635
636 Ok(token)
637 }
638
639 pub fn try_ack(&self, publish: &Publish) -> Result<Token<AckOfAck>, ClientError> {
641 self.client.try_ack(publish)
642 }
643
644 fn handle_subscribe<S: Into<String>>(
646 &self,
647 topic: S,
648 qos: QoS,
649 properties: Option<SubscribeProperties>,
650 ) -> Result<Token<SubAck>, ClientError> {
651 let (resolver, token) = Resolver::new();
652 let filter = Filter::new(topic, qos);
653 let subscribe = Subscribe::new(filter, properties);
654 let is_valid = subscribe_has_valid_filters(&subscribe);
655 let request = Request::Subscribe(subscribe, resolver);
656 if !is_valid {
657 return Err(ClientError::Request(request));
658 }
659 self.client.request_tx.send(request)?;
660
661 Ok(token)
662 }
663
664 pub fn subscribe_with_properties<S: Into<String>>(
665 &self,
666 topic: S,
667 qos: QoS,
668 properties: SubscribeProperties,
669 ) -> Result<Token<SubAck>, ClientError> {
670 self.handle_subscribe(topic, qos, Some(properties))
671 }
672
673 pub fn subscribe<S: Into<String>>(
674 &self,
675 topic: S,
676 qos: QoS,
677 ) -> Result<Token<SubAck>, ClientError> {
678 self.handle_subscribe(topic, qos, None)
679 }
680
681 pub fn try_subscribe_with_properties<S: Into<String>>(
683 &self,
684 topic: S,
685 qos: QoS,
686 properties: SubscribeProperties,
687 ) -> Result<Token<SubAck>, ClientError> {
688 self.client
689 .try_subscribe_with_properties(topic, qos, properties)
690 }
691
692 pub fn try_subscribe<S: Into<String>>(
693 &self,
694 topic: S,
695 qos: QoS,
696 ) -> Result<Token<SubAck>, ClientError> {
697 self.client.try_subscribe(topic, qos)
698 }
699
700 fn handle_subscribe_many<T>(
702 &self,
703 topics: T,
704 properties: Option<SubscribeProperties>,
705 ) -> Result<Token<SubAck>, ClientError>
706 where
707 T: IntoIterator<Item = Filter>,
708 {
709 let (resolver, token) = Resolver::new();
710 let subscribe = Subscribe::new_many(topics, properties);
711 let is_valid = subscribe_has_valid_filters(&subscribe);
712 let request = Request::Subscribe(subscribe, resolver);
713 if !is_valid {
714 return Err(ClientError::Request(request));
715 }
716 self.client.request_tx.send(request)?;
717
718 Ok(token)
719 }
720
721 pub fn subscribe_many_with_properties<T>(
722 &self,
723 topics: T,
724 properties: SubscribeProperties,
725 ) -> Result<Token<SubAck>, ClientError>
726 where
727 T: IntoIterator<Item = Filter>,
728 {
729 self.handle_subscribe_many(topics, Some(properties))
730 }
731
732 pub fn subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
733 where
734 T: IntoIterator<Item = Filter>,
735 {
736 self.handle_subscribe_many(topics, None)
737 }
738
739 pub fn try_subscribe_many_with_properties<T>(
740 &self,
741 topics: T,
742 properties: SubscribeProperties,
743 ) -> Result<Token<SubAck>, ClientError>
744 where
745 T: IntoIterator<Item = Filter>,
746 {
747 self.client
748 .try_subscribe_many_with_properties(topics, properties)
749 }
750
751 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<Token<SubAck>, ClientError>
752 where
753 T: IntoIterator<Item = Filter>,
754 {
755 self.client.try_subscribe_many(topics)
756 }
757
758 fn handle_unsubscribe<S: Into<String>>(
760 &self,
761 topic: S,
762 properties: Option<UnsubscribeProperties>,
763 ) -> Result<Token<UnsubAck>, ClientError> {
764 let (resolver, token) = Resolver::new();
765 let unsubscribe = Unsubscribe::new(topic, properties);
766 let request = Request::Unsubscribe(unsubscribe, resolver);
767 self.client.request_tx.send(request)?;
768
769 Ok(token)
770 }
771
772 pub fn unsubscribe_with_properties<S: Into<String>>(
773 &self,
774 topic: S,
775 properties: UnsubscribeProperties,
776 ) -> Result<Token<UnsubAck>, ClientError> {
777 self.handle_unsubscribe(topic, Some(properties))
778 }
779
780 pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<Token<UnsubAck>, ClientError> {
781 self.handle_unsubscribe(topic, None)
782 }
783
784 pub fn try_unsubscribe_with_properties<S: Into<String>>(
786 &self,
787 topic: S,
788 properties: UnsubscribeProperties,
789 ) -> Result<Token<UnsubAck>, ClientError> {
790 self.client
791 .try_unsubscribe_with_properties(topic, properties)
792 }
793
794 pub fn try_unsubscribe<S: Into<String>>(
795 &self,
796 topic: S,
797 ) -> Result<Token<UnsubAck>, ClientError> {
798 self.client.try_unsubscribe(topic)
799 }
800
801 pub fn disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
803 let (resolver, token) = Resolver::new();
804 let request = Request::Disconnect(resolver);
805 self.client.request_tx.send(request)?;
806
807 Ok(token)
808 }
809
810 pub fn try_disconnect(&self) -> Result<Token<NoResponse>, ClientError> {
812 self.client.try_disconnect()
813 }
814}
815
816#[must_use]
817fn subscribe_has_valid_filters(subscribe: &Subscribe) -> bool {
818 !subscribe.filters.is_empty()
819 && subscribe
820 .filters
821 .iter()
822 .all(|filter| valid_filter(&filter.path))
823}
824
825#[derive(Debug, Eq, PartialEq)]
827pub struct RecvError;
828
829#[derive(Debug, Eq, PartialEq)]
831pub enum TryRecvError {
832 Disconnected,
834 Empty,
836}
837
838#[derive(Debug, Eq, PartialEq)]
840pub enum RecvTimeoutError {
841 Disconnected,
843 Timeout,
845}
846
847pub struct Connection {
849 pub eventloop: EventLoop,
850 runtime: Runtime,
851}
852impl Connection {
853 fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
854 Connection { eventloop, runtime }
855 }
856
857 #[must_use = "Connection should be iterated over a loop to make progress"]
864 pub fn iter(&mut self) -> Iter<'_> {
865 Iter { connection: self }
866 }
867
868 pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
873 let f = self.eventloop.poll();
874 let event = self.runtime.block_on(f);
875
876 resolve_event(event).ok_or(RecvError)
877 }
878
879 pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
884 let f = self.eventloop.poll();
885 let _guard = self.runtime.enter();
888 let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
889
890 resolve_event(event).ok_or(TryRecvError::Disconnected)
891 }
892
893 pub fn recv_timeout(
898 &mut self,
899 duration: Duration,
900 ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
901 let f = self.eventloop.poll();
902 let event = self
903 .runtime
904 .block_on(async { timeout(duration, f).await })
905 .map_err(|_| RecvTimeoutError::Timeout)?;
906
907 resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
908 }
909}
910
911fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
912 match event {
913 Ok(v) => Some(Ok(v)),
914 Err(ConnectionError::RequestsDone) => {
916 trace!("Done with requests");
917 None
918 }
919 Err(e) => Some(Err(e)),
920 }
921}
922
923pub struct Iter<'a> {
925 connection: &'a mut Connection,
926}
927
928impl Iterator for Iter<'_> {
929 type Item = Result<Event, ConnectionError>;
930
931 fn next(&mut self) -> Option<Self::Item> {
932 self.connection.recv().ok()
933 }
934}
935
936#[cfg(test)]
937mod test {
938 use crate::v5::mqttbytes::v5::LastWill;
939
940 use super::*;
941
942 #[test]
943 fn calling_iter_twice_on_connection_shouldnt_panic() {
944 use std::time::Duration;
945
946 let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
947 let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
948 mqttoptions
949 .set_keep_alive(Duration::from_secs(5))
950 .set_last_will(will);
951
952 let (_, mut connection) = Client::new(mqttoptions, 10);
953 let _ = connection.iter();
954 let _ = connection.iter();
955 }
956
957 #[test]
958 fn should_be_able_to_build_test_client_from_channel() {
959 let (tx, rx) = flume::bounded(1);
960 let client = Client::from_sender(tx);
961 client
962 .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
963 .expect("Should be able to publish");
964 let _ = rx.try_recv().expect("Should have message");
965 }
966}