1use crate::notice::{
2 PublishNoticeTx, PublishResult, SubscribeNoticeTx, TrackedNoticeTx, UnsubscribeNoticeTx,
3};
4use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
5use crate::{MqttOptions, Outgoing};
6use crate::{NoticeFailureReason, PublishNoticeError};
7use crate::{Transport, framed::Network};
8
9use crate::framed::AsyncReadWrite;
10use crate::mqttbytes::v4::{
11 ConnAck, Connect, ConnectReturnCode, PingReq, Publish, Subscribe, Unsubscribe,
12};
13use flume::{Receiver, Sender, TryRecvError, bounded, unbounded};
14use rumqttc_core::{OutboundScheduler, RequestClass, RequestReadiness, ScheduledRequest};
15use tokio::select;
16use tokio::time::{self, Instant, Sleep};
17
18use std::collections::VecDeque;
19use std::io;
20use std::pin::Pin;
21use std::time::Duration;
22
23#[cfg(unix)]
24use {std::path::Path, tokio::net::UnixStream};
25
26#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
27use crate::tls;
28
29#[cfg(feature = "websocket")]
30use {
31 crate::websockets::WsAdapter,
32 crate::websockets::{UrlError, split_url, validate_response_headers},
33 async_tungstenite::tungstenite::client::IntoClientRequest,
34};
35
36#[cfg(feature = "proxy")]
37use crate::proxy::ProxyError;
38
39#[derive(Debug)]
40pub struct RequestEnvelope {
41 request: Request,
42 notice: Option<TrackedNoticeTx>,
43}
44
45impl RequestEnvelope {
46 pub(crate) const fn from_parts(request: Request, notice: Option<TrackedNoticeTx>) -> Self {
47 Self { request, notice }
48 }
49
50 pub(crate) const fn plain(request: Request) -> Self {
51 Self {
52 request,
53 notice: None,
54 }
55 }
56
57 pub(crate) const fn tracked_publish(publish: Publish, notice: PublishNoticeTx) -> Self {
58 Self {
59 request: Request::Publish(publish),
60 notice: Some(TrackedNoticeTx::Publish(notice)),
61 }
62 }
63
64 pub(crate) const fn tracked_subscribe(subscribe: Subscribe, notice: SubscribeNoticeTx) -> Self {
65 Self {
66 request: Request::Subscribe(subscribe),
67 notice: Some(TrackedNoticeTx::Subscribe(notice)),
68 }
69 }
70
71 pub(crate) const fn tracked_unsubscribe(
72 unsubscribe: Unsubscribe,
73 notice: UnsubscribeNoticeTx,
74 ) -> Self {
75 Self {
76 request: Request::Unsubscribe(unsubscribe),
77 notice: Some(TrackedNoticeTx::Unsubscribe(notice)),
78 }
79 }
80
81 pub(crate) fn into_parts(self) -> (Request, Option<TrackedNoticeTx>) {
82 (self.request, self.notice)
83 }
84}
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum RequestChannelCapacity {
88 Bounded(usize),
89 Unbounded,
90}
91
92struct PendingDisconnect {
93 deadline: Option<Instant>,
94}
95
96impl PendingDisconnect {
97 fn new(timeout: Option<Duration>) -> Self {
98 Self {
99 deadline: timeout.map(|timeout| Instant::now() + timeout),
100 }
101 }
102}
103
104#[derive(Debug, thiserror::Error)]
106pub enum ConnectionError {
107 #[error("Mqtt state: {0}")]
108 MqttState(#[from] StateError),
109 #[error("Network timeout")]
110 NetworkTimeout,
111 #[error("Flush timeout")]
112 FlushTimeout,
113 #[error("Graceful disconnect timed out before outbound protocol state drained")]
114 DisconnectTimeout,
115 #[cfg(feature = "websocket")]
116 #[error("Websocket: {0}")]
117 Websocket(#[from] async_tungstenite::tungstenite::error::Error),
118 #[cfg(feature = "websocket")]
119 #[error("Websocket Connect: {0}")]
120 WsConnect(#[from] http::Error),
121 #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
122 #[error("TLS: {0}")]
123 Tls(#[from] tls::Error),
124 #[error("I/O: {0}")]
125 Io(#[from] io::Error),
126 #[error("Connection refused, return code: `{0:?}`")]
127 ConnectionRefused(ConnectReturnCode),
128 #[error("Expected ConnAck packet, received: {0:?}")]
129 NotConnAck(Box<Packet>),
130 #[error(
131 "Broker replied with session_present={session_present} for clean_session={clean_session}"
132 )]
133 SessionStateMismatch {
134 clean_session: bool,
135 session_present: bool,
136 },
137 #[error("Broker target is incompatible with the selected transport")]
138 BrokerTransportMismatch,
139 #[error("Requests done")]
142 RequestsDone,
143 #[cfg(feature = "websocket")]
144 #[error("Invalid Url: {0}")]
145 InvalidUrl(#[from] UrlError),
146 #[cfg(feature = "proxy")]
147 #[error("Proxy Connect: {0}")]
148 Proxy(#[from] ProxyError),
149 #[cfg(feature = "websocket")]
150 #[error("Websocket response validation error: ")]
151 ResponseValidation(#[from] crate::websockets::ValidationError),
152 #[cfg(feature = "websocket")]
153 #[error("Websocket request modifier failed: {0}")]
154 RequestModifier(#[source] Box<dyn std::error::Error + Send + Sync>),
155}
156
157pub struct EventLoop {
159 pub mqtt_options: MqttOptions,
161 pub state: MqttState,
163 requests_rx: Receiver<RequestEnvelope>,
165 control_requests_rx: Receiver<RequestEnvelope>,
167 immediate_disconnect_rx: Receiver<RequestEnvelope>,
169 _requests_tx: Option<Sender<RequestEnvelope>>,
172 _control_requests_tx: Option<Sender<RequestEnvelope>>,
173 _immediate_disconnect_tx: Option<Sender<RequestEnvelope>>,
174 pending: VecDeque<RequestEnvelope>,
176 queued: OutboundScheduler<RequestEnvelope>,
178 pub network: Option<Network>,
180 keepalive_timeout: Option<Pin<Box<Sleep>>>,
182 no_sleep: Option<Pin<Box<Sleep>>>,
185 pub network_options: NetworkOptions,
186 pending_disconnect: Option<PendingDisconnect>,
187 disconnect_complete: bool,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq)]
192pub enum Event {
193 Incoming(Incoming),
194 Outgoing(Outgoing),
195}
196
197impl EventLoop {
198 fn reconcile_connack_session(&mut self, session_present: bool) -> Result<(), ConnectionError> {
199 let clean_session = self.mqtt_options.clean_session();
200 if clean_session && session_present {
201 return Err(ConnectionError::SessionStateMismatch {
202 clean_session,
203 session_present,
204 });
205 }
206
207 if !session_present {
208 self.reset_session_state();
209 }
210
211 Ok(())
212 }
213
214 pub fn new(mqtt_options: MqttOptions, cap: usize) -> Self {
219 let (requests_tx, requests_rx) = bounded(cap);
220 let (control_requests_tx, control_requests_rx) = bounded(cap);
221 let (immediate_disconnect_tx, immediate_disconnect_rx) = unbounded();
222 Self::with_channel(
223 mqtt_options,
224 requests_rx,
225 control_requests_rx,
226 immediate_disconnect_rx,
227 Some(requests_tx),
228 Some(control_requests_tx),
229 Some(immediate_disconnect_tx),
230 )
231 }
232
233 pub(crate) fn new_for_async_client_with_capacity(
238 mqtt_options: MqttOptions,
239 capacity: RequestChannelCapacity,
240 ) -> (
241 Self,
242 Sender<RequestEnvelope>,
243 Sender<RequestEnvelope>,
244 Sender<RequestEnvelope>,
245 ) {
246 let (requests_tx, requests_rx) = match capacity {
247 RequestChannelCapacity::Bounded(cap) => bounded(cap),
248 RequestChannelCapacity::Unbounded => unbounded(),
249 };
250 let (control_requests_tx, control_requests_rx) = match capacity {
251 RequestChannelCapacity::Bounded(cap) => bounded(cap),
252 RequestChannelCapacity::Unbounded => unbounded(),
253 };
254 let (immediate_disconnect_tx, immediate_disconnect_rx) = unbounded();
255 let eventloop = Self::with_channel(
256 mqtt_options,
257 requests_rx,
258 control_requests_rx,
259 immediate_disconnect_rx,
260 None,
261 None,
262 None,
263 );
264 (
265 eventloop,
266 requests_tx,
267 control_requests_tx,
268 immediate_disconnect_tx,
269 )
270 }
271
272 #[cfg(test)]
274 pub(crate) fn new_for_async_client(
275 mqtt_options: MqttOptions,
276 cap: usize,
277 ) -> (Self, Sender<RequestEnvelope>) {
278 let (eventloop, request_tx, _control_request_tx, _immediate_disconnect_tx) =
279 Self::new_for_async_client_with_capacity(
280 mqtt_options,
281 RequestChannelCapacity::Bounded(cap),
282 );
283 (eventloop, request_tx)
284 }
285
286 fn with_channel(
287 mqtt_options: MqttOptions,
288 requests_rx: Receiver<RequestEnvelope>,
289 control_requests_rx: Receiver<RequestEnvelope>,
290 immediate_disconnect_rx: Receiver<RequestEnvelope>,
291 requests_tx: Option<Sender<RequestEnvelope>>,
292 control_requests_tx: Option<Sender<RequestEnvelope>>,
293 immediate_disconnect_tx: Option<Sender<RequestEnvelope>>,
294 ) -> Self {
295 let pending = VecDeque::new();
296 let max_inflight = mqtt_options.inflight;
297 let manual_acks = mqtt_options.manual_acks;
298
299 Self {
300 mqtt_options,
301 state: MqttState::new_internal(max_inflight, manual_acks),
302 requests_rx,
303 control_requests_rx,
304 immediate_disconnect_rx,
305 _requests_tx: requests_tx,
306 _control_requests_tx: control_requests_tx,
307 _immediate_disconnect_tx: immediate_disconnect_tx,
308 pending,
309 queued: OutboundScheduler::default(),
310 network: None,
311 keepalive_timeout: None,
312 no_sleep: None,
313 network_options: NetworkOptions::new(),
314 pending_disconnect: None,
315 disconnect_complete: false,
316 }
317 }
318
319 pub fn clean(&mut self) {
328 self.network = None;
329 self.keepalive_timeout = None;
330 self.pending_disconnect = None;
331 for (request, notice) in self.state.clean_with_notices() {
332 self.pending
333 .push_back(RequestEnvelope::from_parts(request, notice));
334 }
335
336 for envelope in self.queued.drain() {
337 if should_replay_after_reconnect(&envelope.request) {
338 self.pending.push_back(envelope);
339 }
340 }
341
342 for envelope in self.requests_rx.drain() {
344 if should_replay_after_reconnect(&envelope.request) {
347 self.pending.push_back(envelope);
348 }
349 }
350
351 for envelope in self.control_requests_rx.drain() {
352 if should_replay_after_reconnect(&envelope.request) {
355 self.pending.push_back(envelope);
356 }
357 }
358 }
359
360 pub fn pending_len(&self) -> usize {
362 self.pending.len() + self.queued.len()
363 }
364
365 pub fn pending_is_empty(&self) -> bool {
367 self.pending.is_empty() && self.queued.is_empty()
368 }
369
370 pub fn drain_pending_as_failed(&mut self, reason: NoticeFailureReason) -> usize {
374 let mut drained = 0;
375 for envelope in self.pending.drain(..) {
376 drained += 1;
377 if let Some(notice) = envelope.notice {
378 match notice {
379 TrackedNoticeTx::Publish(notice) => {
380 notice.error(reason.publish_error());
381 }
382 TrackedNoticeTx::Subscribe(notice) => {
383 notice.error(reason.subscribe_error());
384 }
385 TrackedNoticeTx::Unsubscribe(notice) => {
386 notice.error(reason.unsubscribe_error());
387 }
388 }
389 }
390 }
391 for envelope in self.queued.drain() {
392 drained += 1;
393 if let Some(notice) = envelope.notice {
394 match notice {
395 TrackedNoticeTx::Publish(notice) => {
396 notice.error(reason.publish_error());
397 }
398 TrackedNoticeTx::Subscribe(notice) => {
399 notice.error(reason.subscribe_error());
400 }
401 TrackedNoticeTx::Unsubscribe(notice) => {
402 notice.error(reason.unsubscribe_error());
403 }
404 }
405 }
406 }
407
408 drained
409 }
410
411 pub fn reset_session_state(&mut self) {
413 self.drain_pending_as_failed(NoticeFailureReason::SessionReset);
414 self.state.fail_pending_notices();
415 }
416
417 pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
427 if self.disconnect_complete {
428 return Err(ConnectionError::RequestsDone);
429 }
430
431 if self.network.is_none() {
432 if let Ok(envelope) = self.immediate_disconnect_rx.try_recv() {
433 self.disconnect_complete = true;
434 if let Some(notice) = envelope.notice {
435 drop(notice);
436 }
437 return Err(ConnectionError::RequestsDone);
438 }
439
440 let (network, connack) = match time::timeout(
441 Duration::from_secs(self.network_options.connection_timeout()),
442 connect(&self.mqtt_options, self.network_options.clone()),
443 )
444 .await
445 {
446 Ok(inner) => inner?,
447 Err(_) => return Err(ConnectionError::NetworkTimeout),
448 };
449 self.reconcile_connack_session(connack.session_present)?;
450 self.network = Some(network);
451
452 if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
453 self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
454 }
455
456 return Ok(Event::Incoming(Packet::ConnAck(connack)));
457 }
458
459 match self.select().await {
460 Ok(v) => Ok(v),
461 Err(ConnectionError::DisconnectTimeout) => {
462 self.network = None;
463 self.keepalive_timeout = None;
464 self.pending_disconnect = None;
465 self.disconnect_complete = true;
466 Err(ConnectionError::DisconnectTimeout)
467 }
468 Err(e) => {
469 self.clean();
472 Err(e)
473 }
474 }
475 }
476
477 #[allow(clippy::too_many_lines)]
479 async fn select(&mut self) -> Result<Event, ConnectionError> {
480 loop {
481 if let Some(event) = self.state.events.pop_front() {
482 return Ok(event);
483 }
484
485 if let Ok(envelope) = self.immediate_disconnect_rx.try_recv() {
486 return self.handle_immediate_disconnect(envelope).await;
487 }
488
489 if self.queued.is_empty()
490 && self.pending.is_empty()
491 && self.pending_disconnect.is_none()
492 && self.requests_rx.is_disconnected()
493 && self.requests_rx.is_empty()
494 && self.control_requests_rx.is_disconnected()
495 && self.control_requests_rx.is_empty()
496 && self.state.outbound_requests_drained()
497 {
498 return Err(ConnectionError::RequestsDone);
499 }
500
501 if self.handle_ready_requests().await? {
502 if let Some(event) = self.state.events.pop_front() {
503 return Ok(event);
504 }
505 continue;
506 }
507
508 if self.pending_disconnect.is_some() {
509 if self.queued.is_empty() && self.state.outbound_requests_drained() {
510 return self.send_pending_disconnect().await;
511 }
512
513 if let Some(event) = self.poll_disconnect_drain().await? {
514 return Ok(event);
515 }
516 continue;
517 }
518
519 let read_batch_size = self.effective_read_batch_size();
520 let normal_request_admission_allowed =
521 self.normal_request_admission_allowed() || !self.pending.is_empty();
522 let no_sleep = self
523 .no_sleep
524 .get_or_insert_with(|| Box::pin(time::sleep(Duration::MAX)));
525
526 return select! {
527 biased;
528 o = self.immediate_disconnect_rx.recv_async(), if !self.immediate_disconnect_rx.is_disconnected() => match o {
529 Ok(envelope) => self.handle_immediate_disconnect(envelope).await,
530 Err(_) => continue,
531 },
532 o = self.control_requests_rx.recv_async(),
533 if self.pending_disconnect.is_none()
534 && (!self.control_requests_rx.is_empty()
535 || !self.control_requests_rx.is_disconnected()) => match o {
536 Ok(envelope) => {
537 self.try_admit_existing_normal_requests().await;
538 self.queued.push_back(envelope);
539 continue;
540 }
541 Err(_) => continue,
542 },
543 o = Self::next_request(
544 &mut self.pending,
545 &self.requests_rx,
546 self.mqtt_options.pending_throttle
547 ), if self.pending_disconnect.is_none()
548 && normal_request_admission_allowed
549 && (!self.pending.is_empty()
550 || !self.requests_rx.is_empty()
551 || !self.requests_rx.is_disconnected()) => match o {
552 Ok((request, notice)) => {
553 self.admit_normal_request_batch((request, notice)).await;
554 continue;
555 }
556 Err(_) => continue,
557 },
558 o = self.network.as_mut().unwrap().readb(&mut self.state, read_batch_size) => {
559 o?;
560 self.flush_network().await?;
561 Ok(self.state.events.pop_front().unwrap())
562 },
563 () = self.keepalive_timeout.as_mut().unwrap_or(no_sleep),
564 if self.keepalive_timeout.is_some() && !self.mqtt_options.keep_alive.is_zero() => {
565 let timeout = self.keepalive_timeout.as_mut().unwrap();
566 timeout.as_mut().reset(Instant::now() + self.mqtt_options.keep_alive);
567
568 let (outgoing, _flush_notice) = self
569 .state
570 .handle_outgoing_packet_with_notice(Request::PingReq(PingReq), None)?;
571 if let Some(outgoing) = outgoing {
572 self.network.as_mut().unwrap().write(outgoing).await?;
573 }
574 self.flush_network().await?;
575 Ok(self.state.events.pop_front().unwrap())
576 }
577 };
578 }
579 }
580
581 async fn handle_immediate_disconnect(
582 &mut self,
583 envelope: RequestEnvelope,
584 ) -> Result<Event, ConnectionError> {
585 let (request, notice) = envelope.into_parts();
586 let mut should_flush = false;
587 let mut qos0_notices = Vec::new();
588 self.handle_request(request, notice, &mut should_flush, &mut qos0_notices)
589 .await?;
590 self.flush_request_batch(should_flush, qos0_notices).await?;
591 Ok(self.state.events.pop_front().unwrap())
592 }
593
594 async fn handle_ready_requests(&mut self) -> Result<bool, ConnectionError> {
595 let Some((request, notice)) = self.next_scheduled_request() else {
596 return Ok(false);
597 };
598
599 let mut should_flush = false;
600 let mut qos0_notices = Vec::new();
601 if self
602 .handle_request(request, notice, &mut should_flush, &mut qos0_notices)
603 .await?
604 {
605 for _ in 1..self.mqtt_options.max_request_batch.max(1) {
606 let Some((next_request, next_notice)) = self.next_scheduled_request() else {
607 break;
608 };
609
610 if !self
611 .handle_request(
612 next_request,
613 next_notice,
614 &mut should_flush,
615 &mut qos0_notices,
616 )
617 .await?
618 {
619 break;
620 }
621 }
622 }
623
624 self.flush_request_batch(should_flush, qos0_notices).await?;
625 Ok(true)
626 }
627
628 fn next_scheduled_request(&mut self) -> Option<(Request, Option<TrackedNoticeTx>)> {
629 let state = &self.state;
630 self.queued
631 .pop_next(|envelope| classify_request(state, &envelope.request))
632 .map(RequestEnvelope::into_parts)
633 }
634
635 fn normal_request_admission_allowed(&self) -> bool {
636 self.queued.is_empty()
637 || self
638 .queued
639 .has_ready(|envelope| classify_request(&self.state, &envelope.request))
640 }
641
642 async fn try_admit_existing_normal_requests(&mut self) {
643 let ready = self.pending.len() + self.requests_rx.len();
644 for _ in 0..ready {
645 if !(self.normal_request_admission_allowed() || !self.pending.is_empty()) {
646 break;
647 }
648
649 let Some((request, notice)) = Self::try_next_request(
650 &mut self.pending,
651 &self.requests_rx,
652 self.mqtt_options.pending_throttle,
653 )
654 .await
655 else {
656 break;
657 };
658
659 let stop_batch = is_disconnect_request(&request);
660 self.queued
661 .push_back(RequestEnvelope::from_parts(request, notice));
662 if stop_batch {
663 break;
664 }
665 }
666 }
667
668 async fn admit_normal_request_batch(&mut self, first: (Request, Option<TrackedNoticeTx>)) {
669 let (request, notice) = first;
670 let stop_batch = is_disconnect_request(&request);
671 self.queued
672 .push_back(RequestEnvelope::from_parts(request, notice));
673 if stop_batch || !(self.normal_request_admission_allowed() || !self.pending.is_empty()) {
674 return;
675 }
676
677 for _ in 1..self.mqtt_options.max_request_batch.max(1) {
678 let Some((request, notice)) = Self::try_next_request(
679 &mut self.pending,
680 &self.requests_rx,
681 self.mqtt_options.pending_throttle,
682 )
683 .await
684 else {
685 break;
686 };
687 let stop_batch = is_disconnect_request(&request);
688 self.queued
689 .push_back(RequestEnvelope::from_parts(request, notice));
690 if stop_batch {
691 break;
692 }
693 }
694 }
695
696 async fn handle_request(
697 &mut self,
698 request: Request,
699 notice: Option<TrackedNoticeTx>,
700 should_flush: &mut bool,
701 qos0_notices: &mut Vec<PublishNoticeTx>,
702 ) -> Result<bool, ConnectionError> {
703 match request {
704 Request::Disconnect(_) => {
705 self.pending_disconnect = Some(PendingDisconnect::new(None));
706 Ok(false)
707 }
708 Request::DisconnectWithTimeout(_, timeout) => {
709 self.pending_disconnect = Some(PendingDisconnect::new(Some(timeout)));
710 Ok(false)
711 }
712 Request::DisconnectNow(_) => {
713 let (outgoing, _) = self
714 .state
715 .handle_outgoing_packet_with_notice(request, notice)?;
716 if let Some(outgoing) = outgoing {
717 if let Err(err) = self.network.as_mut().unwrap().write(outgoing).await {
718 return Err(ConnectionError::MqttState(err));
719 }
720 *should_flush = true;
721 }
722 self.disconnect_complete = true;
723 Ok(false)
724 }
725 request => {
726 let (outgoing, flush_notice) = self
727 .state
728 .handle_outgoing_packet_with_notice(request, notice)?;
729 if let Some(notice) = flush_notice {
730 qos0_notices.push(notice);
731 }
732 if let Some(outgoing) = outgoing {
733 if let Err(err) = self.network.as_mut().unwrap().write(outgoing).await {
734 for notice in qos0_notices.drain(..) {
735 notice.error(PublishNoticeError::Qos0NotFlushed);
736 }
737 return Err(ConnectionError::MqttState(err));
738 }
739 *should_flush = true;
740 }
741 Ok(true)
742 }
743 }
744 }
745
746 async fn flush_request_batch(
747 &mut self,
748 should_flush: bool,
749 qos0_notices: Vec<PublishNoticeTx>,
750 ) -> Result<(), ConnectionError> {
751 if !should_flush {
752 return Ok(());
753 }
754
755 match time::timeout(
756 Duration::from_secs(self.network_options.connection_timeout()),
757 self.network.as_mut().unwrap().flush(),
758 )
759 .await
760 {
761 Ok(Ok(())) => {
762 for notice in qos0_notices {
763 notice.success(PublishResult::Qos0Flushed);
764 }
765 Ok(())
766 }
767 Ok(Err(err)) => {
768 for notice in qos0_notices {
769 notice.error(PublishNoticeError::Qos0NotFlushed);
770 }
771 Err(ConnectionError::MqttState(err))
772 }
773 Err(_) => {
774 for notice in qos0_notices {
775 notice.error(PublishNoticeError::Qos0NotFlushed);
776 }
777 Err(ConnectionError::FlushTimeout)
778 }
779 }
780 }
781
782 async fn flush_network(&mut self) -> Result<(), ConnectionError> {
783 time::timeout(
784 Duration::from_secs(self.network_options.connection_timeout()),
785 self.network.as_mut().unwrap().flush(),
786 )
787 .await
788 .map_or_else(
789 |_| Err(ConnectionError::FlushTimeout),
790 |inner| inner.map_err(ConnectionError::MqttState),
791 )
792 }
793
794 async fn poll_disconnect_drain(&mut self) -> Result<Option<Event>, ConnectionError> {
795 let read_batch_size = self.effective_read_batch_size();
796 let read = self
797 .network
798 .as_mut()
799 .unwrap()
800 .readb(&mut self.state, read_batch_size);
801
802 if let Some(deadline) = self
803 .pending_disconnect
804 .as_ref()
805 .and_then(|pending| pending.deadline)
806 {
807 select! {
808 o = self.immediate_disconnect_rx.recv_async(), if !self.immediate_disconnect_rx.is_disconnected() => match o {
809 Ok(envelope) => return self.handle_immediate_disconnect(envelope).await.map(Some),
810 Err(_) => return Ok(None),
811 },
812 result = read => result?,
813 () = time::sleep_until(deadline) => return Err(ConnectionError::DisconnectTimeout),
814 }
815 } else {
816 select! {
817 o = self.immediate_disconnect_rx.recv_async(), if !self.immediate_disconnect_rx.is_disconnected() => match o {
818 Ok(envelope) => return self.handle_immediate_disconnect(envelope).await.map(Some),
819 Err(_) => return Ok(None),
820 },
821 result = read => result?,
822 }
823 }
824
825 self.flush_network().await?;
826 Ok(None)
827 }
828
829 async fn send_pending_disconnect(&mut self) -> Result<Event, ConnectionError> {
830 self.pending_disconnect = None;
831 let (outgoing, _) = self
832 .state
833 .handle_outgoing_packet_with_notice(Request::DisconnectNow(crate::Disconnect), None)?;
834
835 if let Some(outgoing) = outgoing {
836 self.network.as_mut().unwrap().write(outgoing).await?;
837 self.flush_network().await?;
838 }
839
840 self.disconnect_complete = true;
841 Ok(self.state.events.pop_front().unwrap())
842 }
843
844 pub fn network_options(&self) -> NetworkOptions {
845 self.network_options.clone()
846 }
847
848 pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
849 self.network_options = network_options;
850 self
851 }
852
853 fn effective_read_batch_size(&self) -> usize {
854 const MAX_READ_BATCH_SIZE: usize = 128;
855 const PENDING_FAIRNESS_CAP: usize = 16;
856
857 let configured = self.mqtt_options.read_batch_size();
858 if configured > 0 {
859 return configured.clamp(1, MAX_READ_BATCH_SIZE);
860 }
861
862 let request_batch = self.mqtt_options.max_request_batch().max(1);
863 let inflight = usize::from(self.mqtt_options.inflight());
864 let mut adaptive = request_batch.max(inflight / 2).max(8);
865
866 if !self.pending.is_empty()
867 || !self.queued.is_empty()
868 || !self.requests_rx.is_empty()
869 || !self.control_requests_rx.is_empty()
870 {
871 adaptive = adaptive.min(PENDING_FAIRNESS_CAP);
872 }
873
874 adaptive.clamp(1, MAX_READ_BATCH_SIZE)
875 }
876
877 async fn try_next_request(
878 pending: &mut VecDeque<RequestEnvelope>,
879 rx: &Receiver<RequestEnvelope>,
880 pending_throttle: Duration,
881 ) -> Option<(Request, Option<TrackedNoticeTx>)> {
882 if !pending.is_empty() {
883 if pending_throttle.is_zero() {
884 tokio::task::yield_now().await;
885 } else {
886 time::sleep(pending_throttle).await;
887 }
888 return pending.pop_front().map(RequestEnvelope::into_parts);
891 }
892
893 match rx.try_recv() {
894 Ok(envelope) => return Some(envelope.into_parts()),
895 Err(TryRecvError::Disconnected) => return None,
896 Err(TryRecvError::Empty) => {}
897 }
898
899 None
900 }
901
902 async fn next_request(
903 pending: &mut VecDeque<RequestEnvelope>,
904 rx: &Receiver<RequestEnvelope>,
905 pending_throttle: Duration,
906 ) -> Result<(Request, Option<TrackedNoticeTx>), ConnectionError> {
907 if pending.is_empty() {
908 rx.recv_async()
909 .await
910 .map(RequestEnvelope::into_parts)
911 .map_err(|_| ConnectionError::RequestsDone)
912 } else {
913 if pending_throttle.is_zero() {
914 tokio::task::yield_now().await;
915 } else {
916 time::sleep(pending_throttle).await;
917 }
918 Ok(pending.pop_front().unwrap().into_parts())
921 }
922 }
923}
924
925fn classify_request(state: &MqttState, request: &Request) -> ScheduledRequest {
926 match request {
927 Request::Publish(publish) if publish.qos != crate::mqttbytes::QoS::AtMostOnce => {
928 ScheduledRequest {
929 class: RequestClass::FlowControlledPublish,
930 readiness: if state.can_send_publish(publish) {
931 RequestReadiness::Ready
932 } else {
933 RequestReadiness::Blocked
934 },
935 }
936 }
937 Request::Publish(_) => ScheduledRequest {
938 class: RequestClass::Publish,
939 readiness: RequestReadiness::Ready,
940 },
941 Request::Subscribe(_) | Request::Unsubscribe(_) => ScheduledRequest {
942 class: RequestClass::Control,
943 readiness: if state.control_packet_identifier_available() {
944 RequestReadiness::Ready
945 } else {
946 RequestReadiness::Blocked
947 },
948 },
949 _ => ScheduledRequest {
950 class: RequestClass::Control,
951 readiness: RequestReadiness::Ready,
952 },
953 }
954}
955
956const fn is_disconnect_request(request: &Request) -> bool {
957 matches!(
958 request,
959 Request::Disconnect(_) | Request::DisconnectWithTimeout(_, _) | Request::DisconnectNow(_)
960 )
961}
962
963async fn connect(
969 mqtt_options: &MqttOptions,
970 network_options: NetworkOptions,
971) -> Result<(Network, ConnAck), ConnectionError> {
972 let mut network = network_connect(mqtt_options, network_options).await?;
974
975 let connack = mqtt_connect(mqtt_options, &mut network).await?;
977
978 Ok((network, connack))
979}
980
981#[allow(clippy::too_many_lines)]
982async fn network_connect(
983 options: &MqttOptions,
984 network_options: NetworkOptions,
985) -> Result<Network, ConnectionError> {
986 let transport = options.transport();
987
988 #[cfg(unix)]
990 if matches!(&transport, Transport::Unix) {
991 let file = options
992 .broker()
993 .unix_path()
994 .ok_or(ConnectionError::BrokerTransportMismatch)?;
995 let socket = UnixStream::connect(Path::new(file)).await?;
996 let network = Network::new(
997 socket,
998 options.max_incoming_packet_size,
999 options.max_outgoing_packet_size,
1000 );
1001 return Ok(network);
1002 }
1003
1004 let (domain, port) = match &transport {
1006 #[cfg(feature = "websocket")]
1007 Transport::Ws => split_url(
1008 options
1009 .broker()
1010 .websocket_url()
1011 .ok_or(ConnectionError::BrokerTransportMismatch)?,
1012 )?,
1013 #[cfg(all(
1014 any(feature = "use-rustls-no-provider", feature = "use-native-tls"),
1015 feature = "websocket"
1016 ))]
1017 Transport::Wss(_) => split_url(
1018 options
1019 .broker()
1020 .websocket_url()
1021 .ok_or(ConnectionError::BrokerTransportMismatch)?,
1022 )?,
1023 _ => options
1024 .broker()
1025 .tcp_address()
1026 .map(|(host, port)| (host.to_owned(), port))
1027 .ok_or(ConnectionError::BrokerTransportMismatch)?,
1028 };
1029
1030 let tcp_stream: Box<dyn AsyncReadWrite> = {
1031 #[cfg(feature = "proxy")]
1032 if let Some(proxy) = options.proxy() {
1033 proxy
1034 .connect(
1035 &domain,
1036 port,
1037 network_options,
1038 Some(options.effective_socket_connector()),
1039 )
1040 .await?
1041 } else {
1042 let addr = format!("{domain}:{port}");
1043 options.socket_connect(addr, network_options).await?
1044 }
1045 #[cfg(not(feature = "proxy"))]
1046 {
1047 let addr = format!("{domain}:{port}");
1048 options.socket_connect(addr, network_options).await?
1049 }
1050 };
1051
1052 let network = match transport {
1053 Transport::Tcp => Network::new(
1054 tcp_stream,
1055 options.max_incoming_packet_size,
1056 options.max_outgoing_packet_size,
1057 ),
1058 #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
1059 Transport::Tls(tls_config) => {
1060 let (host, port) = options
1061 .broker()
1062 .tcp_address()
1063 .expect("tls transport requires a tcp broker");
1064 let socket = tls::tls_connect(host, port, &tls_config, tcp_stream).await?;
1065 Network::new(
1066 socket,
1067 options.max_incoming_packet_size,
1068 options.max_outgoing_packet_size,
1069 )
1070 }
1071 #[cfg(unix)]
1072 Transport::Unix => unreachable!(),
1073 #[cfg(feature = "websocket")]
1074 Transport::Ws => {
1075 let mut request = options
1076 .broker()
1077 .websocket_url()
1078 .expect("ws transport requires a websocket broker")
1079 .into_client_request()?;
1080 request
1081 .headers_mut()
1082 .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap());
1083
1084 if let Some(request_modifier) = options.fallible_request_modifier() {
1085 request = request_modifier(request)
1086 .await
1087 .map_err(ConnectionError::RequestModifier)?;
1088 } else if let Some(request_modifier) = options.request_modifier() {
1089 request = request_modifier(request).await;
1090 }
1091
1092 let (socket, response) =
1093 async_tungstenite::tokio::client_async(request, tcp_stream).await?;
1094 validate_response_headers(response)?;
1095
1096 Network::new(
1097 WsAdapter::new(socket),
1098 options.max_incoming_packet_size,
1099 options.max_outgoing_packet_size,
1100 )
1101 }
1102 #[cfg(all(
1103 any(feature = "use-rustls-no-provider", feature = "use-native-tls"),
1104 feature = "websocket"
1105 ))]
1106 Transport::Wss(tls_config) => {
1107 let mut request = options
1108 .broker()
1109 .websocket_url()
1110 .expect("wss transport requires a websocket broker")
1111 .into_client_request()?;
1112 request
1113 .headers_mut()
1114 .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap());
1115
1116 if let Some(request_modifier) = options.fallible_request_modifier() {
1117 request = request_modifier(request)
1118 .await
1119 .map_err(ConnectionError::RequestModifier)?;
1120 } else if let Some(request_modifier) = options.request_modifier() {
1121 request = request_modifier(request).await;
1122 }
1123
1124 let tls_stream = tls::tls_connect(&domain, port, &tls_config, tcp_stream).await?;
1125 let (socket, response) =
1126 async_tungstenite::tokio::client_async(request, tls_stream).await?;
1127 validate_response_headers(response)?;
1128
1129 Network::new(
1130 WsAdapter::new(socket),
1131 options.max_incoming_packet_size,
1132 options.max_outgoing_packet_size,
1133 )
1134 }
1135 };
1136
1137 Ok(network)
1138}
1139
1140async fn mqtt_connect(
1141 options: &MqttOptions,
1142 network: &mut Network,
1143) -> Result<ConnAck, ConnectionError> {
1144 let mut connect = Connect::new(options.client_id());
1145 connect.keep_alive = u16::try_from(options.keep_alive().as_secs()).unwrap_or(u16::MAX);
1146 connect.clean_session = options.clean_session();
1147 connect.last_will = options.last_will();
1148 connect.auth = options.auth().clone();
1149
1150 network.write(Packet::Connect(connect)).await?;
1152 network.flush().await?;
1153
1154 match network.read().await? {
1156 Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack),
1157 Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
1158 packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
1159 }
1160}
1161
1162const fn should_replay_after_reconnect(request: &Request) -> bool {
1163 !matches!(request, Request::PubAck(_) | Request::PubRec(_))
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169 use crate::mqttbytes::QoS;
1170 use crate::{Disconnect, PubAck, PubComp, PubRec, PubRel};
1171 use flume::TryRecvError;
1172
1173 fn push_pending(eventloop: &mut EventLoop, request: Request) {
1174 eventloop.pending.push_back(RequestEnvelope::plain(request));
1175 }
1176
1177 fn pending_front_request(eventloop: &EventLoop) -> Option<&Request> {
1178 eventloop.pending.front().map(|envelope| &envelope.request)
1179 }
1180
1181 fn build_eventloop_with_pending(clean_session: bool) -> EventLoop {
1182 let mut options = MqttOptions::new("test-client", "localhost");
1183 options.set_clean_session(clean_session);
1184
1185 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1186 push_pending(&mut eventloop, Request::PingReq(PingReq));
1187 eventloop
1188 }
1189
1190 fn publish(qos: QoS) -> Publish {
1191 Publish::new("hello/world", qos, "payload")
1192 }
1193
1194 fn fill_publish_window(eventloop: &mut EventLoop) {
1195 let mut active = publish(QoS::AtLeastOnce);
1196 active.pkid = 1;
1197 eventloop.state.outgoing_pub[1] = Some(active);
1198 eventloop.state.inflight = 1;
1199 }
1200
1201 fn next_after_blocked_publish(request: Request) -> Option<Request> {
1202 let mut options = MqttOptions::new("test-client", "localhost");
1203 options.set_inflight(1);
1204 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1205 fill_publish_window(&mut eventloop);
1206 eventloop
1207 .queued
1208 .push_back(RequestEnvelope::plain(Request::Publish(publish(
1209 QoS::AtLeastOnce,
1210 ))));
1211 eventloop.queued.push_back(RequestEnvelope::plain(request));
1212
1213 eventloop
1214 .next_scheduled_request()
1215 .map(|(request, _notice)| request)
1216 }
1217
1218 #[test]
1219 fn scheduler_sends_control_after_quota_blocked_publish() {
1220 let mut options = MqttOptions::new("test-client", "localhost");
1221 options.set_inflight(1);
1222 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1223 fill_publish_window(&mut eventloop);
1224 eventloop
1225 .queued
1226 .push_back(RequestEnvelope::plain(Request::Publish(publish(
1227 QoS::AtLeastOnce,
1228 ))));
1229 eventloop
1230 .queued
1231 .push_back(RequestEnvelope::plain(Request::Subscribe(Subscribe::new(
1232 "hello/world",
1233 QoS::AtMostOnce,
1234 ))));
1235
1236 let (request, _) = eventloop.next_scheduled_request().unwrap();
1237
1238 assert!(matches!(request, Request::Subscribe(_)));
1239 match eventloop.state.handle_outgoing_packet(request).unwrap() {
1240 Some(Packet::Subscribe(subscribe)) => assert_ne!(subscribe.pkid, 1),
1241 packet => panic!("expected subscribe packet, got {packet:?}"),
1242 }
1243 assert!(matches!(
1244 eventloop
1245 .queued
1246 .drain()
1247 .next()
1248 .map(|envelope| envelope.request),
1249 Some(Request::Publish(_))
1250 ));
1251 }
1252
1253 #[test]
1254 fn scheduler_does_not_let_qos0_publish_pass_blocked_publish() {
1255 let mut options = MqttOptions::new("test-client", "localhost");
1256 options.set_inflight(1);
1257 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1258 fill_publish_window(&mut eventloop);
1259 eventloop
1260 .queued
1261 .push_back(RequestEnvelope::plain(Request::Publish(publish(
1262 QoS::AtLeastOnce,
1263 ))));
1264 eventloop
1265 .queued
1266 .push_back(RequestEnvelope::plain(Request::Publish(publish(
1267 QoS::AtMostOnce,
1268 ))));
1269
1270 assert!(eventloop.next_scheduled_request().is_none());
1271 }
1272
1273 #[test]
1274 fn scheduler_allows_each_progress_and_control_class_after_blocked_publish() {
1275 assert!(matches!(
1276 next_after_blocked_publish(Request::PubAck(PubAck::new(7))),
1277 Some(Request::PubAck(_))
1278 ));
1279 assert!(matches!(
1280 next_after_blocked_publish(Request::PubRec(PubRec::new(8))),
1281 Some(Request::PubRec(_))
1282 ));
1283 assert!(matches!(
1284 next_after_blocked_publish(Request::PubRel(PubRel::new(9))),
1285 Some(Request::PubRel(_))
1286 ));
1287 assert!(matches!(
1288 next_after_blocked_publish(Request::PubComp(PubComp::new(10))),
1289 Some(Request::PubComp(_))
1290 ));
1291 assert!(matches!(
1292 next_after_blocked_publish(Request::PingReq(PingReq)),
1293 Some(Request::PingReq(_))
1294 ));
1295 assert!(matches!(
1296 next_after_blocked_publish(Request::Subscribe(Subscribe::new(
1297 "hello/world",
1298 QoS::AtMostOnce
1299 ))),
1300 Some(Request::Subscribe(_))
1301 ));
1302 assert!(matches!(
1303 next_after_blocked_publish(Request::Unsubscribe(Unsubscribe::new("hello/world"))),
1304 Some(Request::Unsubscribe(_))
1305 ));
1306 assert!(matches!(
1307 next_after_blocked_publish(Request::Disconnect(Disconnect)),
1308 Some(Request::Disconnect(_))
1309 ));
1310 }
1311
1312 #[test]
1313 fn scheduler_unsubscribe_after_blocked_publish_uses_non_conflicting_packet_id() {
1314 let mut options = MqttOptions::new("test-client", "localhost");
1315 options.set_inflight(1);
1316 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1317 fill_publish_window(&mut eventloop);
1318 eventloop
1319 .queued
1320 .push_back(RequestEnvelope::plain(Request::Publish(publish(
1321 QoS::AtLeastOnce,
1322 ))));
1323 eventloop
1324 .queued
1325 .push_back(RequestEnvelope::plain(Request::Unsubscribe(
1326 Unsubscribe::new("hello/world"),
1327 )));
1328
1329 let (request, _) = eventloop.next_scheduled_request().unwrap();
1330
1331 assert!(matches!(request, Request::Unsubscribe(_)));
1332 match eventloop.state.handle_outgoing_packet(request).unwrap() {
1333 Some(Packet::Unsubscribe(unsubscribe)) => assert_ne!(unsubscribe.pkid, 1),
1334 packet => panic!("expected unsubscribe packet, got {packet:?}"),
1335 }
1336 }
1337
1338 #[test]
1339 fn scheduler_preserves_sendable_publish_before_later_control() {
1340 let (mut eventloop, _request_tx) =
1341 EventLoop::new_for_async_client(MqttOptions::new("test-client", "localhost"), 1);
1342 eventloop
1343 .queued
1344 .push_back(RequestEnvelope::plain(Request::Publish(publish(
1345 QoS::AtLeastOnce,
1346 ))));
1347 eventloop
1348 .queued
1349 .push_back(RequestEnvelope::plain(Request::Subscribe(Subscribe::new(
1350 "hello/world",
1351 QoS::AtMostOnce,
1352 ))));
1353
1354 let (request, _) = eventloop.next_scheduled_request().unwrap();
1355
1356 assert!(matches!(request, Request::Publish(_)));
1357 }
1358
1359 #[tokio::test]
1360 async fn select_admits_control_request_after_ready_publish_backlog_snapshot() {
1361 let mut options = MqttOptions::new("test-client", "localhost");
1362 options.set_max_request_batch(1);
1363 let (mut eventloop, request_tx, control_request_tx, _immediate_disconnect_tx) =
1364 EventLoop::new_for_async_client_with_capacity(
1365 options,
1366 RequestChannelCapacity::Unbounded,
1367 );
1368 let (client, _peer) = tokio::io::duplex(1024);
1369 eventloop.network = Some(Network::new(client, 1024, 1024));
1370
1371 request_tx
1372 .send_async(RequestEnvelope::plain(Request::Publish(publish(
1373 QoS::AtMostOnce,
1374 ))))
1375 .await
1376 .unwrap();
1377 request_tx
1378 .send_async(RequestEnvelope::plain(Request::Publish(publish(
1379 QoS::AtMostOnce,
1380 ))))
1381 .await
1382 .unwrap();
1383 control_request_tx
1384 .send_async(RequestEnvelope::plain(Request::Disconnect(Disconnect)))
1385 .await
1386 .unwrap();
1387
1388 let first = time::timeout(Duration::from_secs(1), eventloop.select())
1389 .await
1390 .expect("timed out waiting for first request")
1391 .expect("select should not fail");
1392 request_tx
1393 .send_async(RequestEnvelope::plain(Request::Publish(publish(
1394 QoS::AtMostOnce,
1395 ))))
1396 .await
1397 .unwrap();
1398 let second = time::timeout(Duration::from_secs(1), eventloop.select())
1399 .await
1400 .expect("timed out waiting for second request")
1401 .expect("select should not fail");
1402 let third = time::timeout(Duration::from_secs(1), eventloop.select())
1403 .await
1404 .expect("timed out waiting for control request")
1405 .expect("select should not fail");
1406
1407 assert!(matches!(first, Event::Outgoing(Outgoing::Publish(_))));
1408 assert!(matches!(second, Event::Outgoing(Outgoing::Publish(_))));
1409 assert!(matches!(third, Event::Outgoing(Outgoing::Disconnect)));
1410 }
1411
1412 #[test]
1413 fn eventloop_new_keeps_internal_sender_alive() {
1414 let options = MqttOptions::new("test-client", "localhost");
1415 let eventloop = EventLoop::new(options, 1);
1416
1417 assert!(matches!(
1418 eventloop.requests_rx.try_recv(),
1419 Err(TryRecvError::Empty)
1420 ));
1421 }
1422
1423 #[test]
1424 fn async_client_constructor_path_allows_channel_shutdown() {
1425 let options = MqttOptions::new("test-client", "localhost");
1426 let (eventloop, request_tx) = EventLoop::new_for_async_client(options, 1);
1427 drop(request_tx);
1428
1429 assert!(matches!(
1430 eventloop.requests_rx.try_recv(),
1431 Err(TryRecvError::Disconnected)
1432 ));
1433 }
1434
1435 #[test]
1436 fn clean_drops_ack_requests_drained_from_channel() {
1437 let options = MqttOptions::new("test-client", "localhost");
1438 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 3);
1439 request_tx
1440 .send(RequestEnvelope::plain(Request::PubAck(PubAck::new(7))))
1441 .unwrap();
1442 request_tx
1443 .send(RequestEnvelope::plain(Request::PubRec(PubRec::new(8))))
1444 .unwrap();
1445 request_tx
1446 .send(RequestEnvelope::plain(Request::PingReq(PingReq)))
1447 .unwrap();
1448
1449 eventloop.clean();
1450
1451 assert_eq!(eventloop.pending_len(), 1);
1452 assert!(matches!(
1453 pending_front_request(&eventloop),
1454 Some(Request::PingReq(_))
1455 ));
1456 }
1457
1458 #[test]
1459 fn clean_drops_ack_requests_drained_from_queued_scheduler() {
1460 let options = MqttOptions::new("test-client", "localhost");
1461 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 3);
1462 eventloop
1463 .queued
1464 .push_back(RequestEnvelope::plain(Request::PubAck(PubAck::new(7))));
1465 eventloop
1466 .queued
1467 .push_back(RequestEnvelope::plain(Request::PubRec(PubRec::new(8))));
1468 eventloop
1469 .queued
1470 .push_back(RequestEnvelope::plain(Request::PingReq(PingReq)));
1471
1472 eventloop.clean();
1473
1474 assert_eq!(eventloop.pending_len(), 1);
1475 assert!(matches!(
1476 pending_front_request(&eventloop),
1477 Some(Request::PingReq(_))
1478 ));
1479 }
1480
1481 #[tokio::test]
1482 #[cfg(unix)]
1483 async fn network_connect_rejects_unix_broker_with_tcp_transport() {
1484 let mut options = MqttOptions::new("test-client", crate::Broker::unix("/tmp/mqtt.sock"));
1485 options.set_transport(Transport::tcp());
1486
1487 match network_connect(&options, NetworkOptions::new()).await {
1488 Err(ConnectionError::BrokerTransportMismatch) => {}
1489 Err(err) => panic!("unexpected error: {err:?}"),
1490 Ok(_) => panic!("mismatched broker and transport should fail"),
1491 }
1492 }
1493
1494 #[tokio::test]
1495 #[cfg(feature = "websocket")]
1496 async fn network_connect_rejects_tcp_broker_with_websocket_transport() {
1497 let mut options = MqttOptions::new("test-client", "localhost");
1498 options.set_transport(Transport::Ws);
1499
1500 match network_connect(&options, NetworkOptions::new()).await {
1501 Err(ConnectionError::BrokerTransportMismatch) => {}
1502 Err(err) => panic!("unexpected error: {err:?}"),
1503 Ok(_) => panic!("mismatched broker and transport should fail"),
1504 }
1505 }
1506
1507 #[tokio::test]
1508 #[cfg(feature = "websocket")]
1509 async fn network_connect_rejects_websocket_broker_with_tcp_transport() {
1510 let broker = crate::Broker::websocket("ws://localhost:9001/mqtt").unwrap();
1511 let mut options = MqttOptions::new("test-client", broker);
1512 options.set_transport(Transport::tcp());
1513
1514 match network_connect(&options, NetworkOptions::new()).await {
1515 Err(ConnectionError::BrokerTransportMismatch) => {}
1516 Err(err) => panic!("unexpected error: {err:?}"),
1517 Ok(_) => panic!("mismatched broker and transport should fail"),
1518 }
1519 }
1520
1521 #[tokio::test]
1522 async fn async_client_path_reports_requests_done_after_pending_drain() {
1523 let options = MqttOptions::new("test-client", "localhost");
1524 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 1);
1525 push_pending(&mut eventloop, Request::PingReq(PingReq));
1526 drop(request_tx);
1527
1528 let request = EventLoop::next_request(
1529 &mut eventloop.pending,
1530 &eventloop.requests_rx,
1531 Duration::ZERO,
1532 )
1533 .await
1534 .unwrap();
1535 assert!(matches!(request, (Request::PingReq(_), None)));
1536
1537 let err = EventLoop::next_request(
1538 &mut eventloop.pending,
1539 &eventloop.requests_rx,
1540 Duration::ZERO,
1541 )
1542 .await
1543 .unwrap_err();
1544 assert!(matches!(err, ConnectionError::RequestsDone));
1545 }
1546
1547 #[tokio::test]
1548 async fn next_request_is_cancellation_safe_for_pending_queue() {
1549 let options = MqttOptions::new("test-client", "localhost");
1550 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1551 push_pending(&mut eventloop, Request::PingReq(PingReq));
1552
1553 let delayed = EventLoop::next_request(
1554 &mut eventloop.pending,
1555 &eventloop.requests_rx,
1556 Duration::from_millis(50),
1557 );
1558 let timed_out = time::timeout(Duration::from_millis(5), delayed).await;
1559
1560 assert!(timed_out.is_err());
1561 assert!(matches!(
1562 pending_front_request(&eventloop),
1563 Some(Request::PingReq(_))
1564 ));
1565 }
1566
1567 #[tokio::test]
1568 async fn try_next_request_applies_pending_throttle_for_followup_pending_item() {
1569 let options = MqttOptions::new("test-client", "localhost");
1570 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 2);
1571 push_pending(&mut eventloop, Request::PingReq(PingReq));
1572 push_pending(&mut eventloop, Request::Disconnect(Disconnect));
1573
1574 let first = EventLoop::next_request(
1575 &mut eventloop.pending,
1576 &eventloop.requests_rx,
1577 Duration::ZERO,
1578 )
1579 .await
1580 .unwrap();
1581 assert!(matches!(first, (Request::PingReq(_), None)));
1582
1583 let delayed = EventLoop::try_next_request(
1584 &mut eventloop.pending,
1585 &eventloop.requests_rx,
1586 Duration::from_millis(50),
1587 );
1588 let timed_out = time::timeout(Duration::from_millis(5), delayed).await;
1589
1590 assert!(timed_out.is_err());
1591 assert!(matches!(
1592 pending_front_request(&eventloop),
1593 Some(Request::Disconnect(_))
1594 ));
1595 }
1596
1597 #[tokio::test]
1598 async fn try_next_request_does_not_throttle_when_pending_queue_is_empty() {
1599 let options = MqttOptions::new("test-client", "localhost");
1600 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 1);
1601 request_tx
1602 .send_async(RequestEnvelope::plain(Request::PingReq(PingReq)))
1603 .await
1604 .unwrap();
1605
1606 let received = time::timeout(
1607 Duration::from_millis(20),
1608 EventLoop::try_next_request(
1609 &mut eventloop.pending,
1610 &eventloop.requests_rx,
1611 Duration::from_secs(1),
1612 ),
1613 )
1614 .await
1615 .unwrap();
1616
1617 assert!(matches!(received, Some((Request::PingReq(_), None))));
1618 }
1619
1620 #[tokio::test]
1621 async fn next_request_prioritizes_pending_over_channel_messages() {
1622 let options = MqttOptions::new("test-client", "localhost");
1623 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 2);
1624 push_pending(&mut eventloop, Request::PingReq(PingReq));
1625 request_tx
1626 .send_async(RequestEnvelope::plain(Request::PingReq(PingReq)))
1627 .await
1628 .unwrap();
1629
1630 let first = EventLoop::next_request(
1631 &mut eventloop.pending,
1632 &eventloop.requests_rx,
1633 Duration::ZERO,
1634 )
1635 .await
1636 .unwrap();
1637 assert!(matches!(first, (Request::PingReq(_), None)));
1638 assert!(eventloop.pending.is_empty());
1639
1640 let second = EventLoop::next_request(
1641 &mut eventloop.pending,
1642 &eventloop.requests_rx,
1643 Duration::ZERO,
1644 )
1645 .await
1646 .unwrap();
1647 assert!(matches!(second, (Request::PingReq(_), None)));
1648 }
1649
1650 #[tokio::test]
1651 async fn next_request_preserves_fifo_order_for_plain_and_tracked_requests() {
1652 let options = MqttOptions::new("test-client", "localhost");
1653 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 4);
1654 let (notice_tx, _notice) = PublishNoticeTx::new();
1655 let tracked_publish =
1656 Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1657
1658 request_tx
1659 .send_async(RequestEnvelope::plain(Request::PingReq(PingReq)))
1660 .await
1661 .unwrap();
1662 request_tx
1663 .send_async(RequestEnvelope::tracked_publish(
1664 tracked_publish.clone(),
1665 notice_tx,
1666 ))
1667 .await
1668 .unwrap();
1669 request_tx
1670 .send_async(RequestEnvelope::plain(Request::Disconnect(Disconnect)))
1671 .await
1672 .unwrap();
1673
1674 let first = EventLoop::next_request(
1675 &mut eventloop.pending,
1676 &eventloop.requests_rx,
1677 Duration::ZERO,
1678 )
1679 .await
1680 .unwrap();
1681 assert!(matches!(first, (Request::PingReq(_), None)));
1682
1683 let second = EventLoop::next_request(
1684 &mut eventloop.pending,
1685 &eventloop.requests_rx,
1686 Duration::ZERO,
1687 )
1688 .await
1689 .unwrap();
1690 assert!(matches!(
1691 second,
1692 (Request::Publish(publish), Some(_)) if publish == tracked_publish
1693 ));
1694
1695 let third = EventLoop::next_request(
1696 &mut eventloop.pending,
1697 &eventloop.requests_rx,
1698 Duration::ZERO,
1699 )
1700 .await
1701 .unwrap();
1702 assert!(matches!(third, (Request::Disconnect(_), None)));
1703 }
1704
1705 #[tokio::test]
1706 async fn tracked_qos0_notice_reports_not_flushed_on_first_write_failure() {
1707 let options = MqttOptions::new("test-client", "localhost");
1708 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 4);
1709 let (client, _peer) = tokio::io::duplex(1024);
1710 eventloop.network = Some(Network::new(client, 1024, 16));
1711
1712 let (notice_tx, notice) = PublishNoticeTx::new();
1713 let publish = Publish::new(
1714 "hello/world",
1715 crate::mqttbytes::QoS::AtMostOnce,
1716 vec![1; 128],
1717 );
1718 request_tx
1719 .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
1720 .await
1721 .unwrap();
1722
1723 let err = eventloop.select().await.unwrap_err();
1724 assert!(matches!(err, ConnectionError::MqttState(_)));
1725 assert_eq!(
1726 notice.wait_async().await.unwrap_err(),
1727 PublishNoticeError::Qos0NotFlushed
1728 );
1729 }
1730
1731 #[tokio::test]
1732 async fn tracked_qos0_notices_report_not_flushed_on_batched_write_failure() {
1733 let mut options = MqttOptions::new("test-client", "localhost");
1734 options.set_max_request_batch(2);
1735 let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 4);
1736 let (client, _peer) = tokio::io::duplex(1024);
1737 eventloop.network = Some(Network::new(client, 1024, 80));
1738
1739 let small_publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtMostOnce, vec![1]);
1740 let large_publish = Publish::new(
1741 "hello/world",
1742 crate::mqttbytes::QoS::AtMostOnce,
1743 vec![2; 256],
1744 );
1745
1746 let (first_notice_tx, first_notice) = PublishNoticeTx::new();
1747 request_tx
1748 .send_async(RequestEnvelope::tracked_publish(
1749 small_publish,
1750 first_notice_tx,
1751 ))
1752 .await
1753 .unwrap();
1754
1755 let (second_notice_tx, second_notice) = PublishNoticeTx::new();
1756 request_tx
1757 .send_async(RequestEnvelope::tracked_publish(
1758 large_publish,
1759 second_notice_tx,
1760 ))
1761 .await
1762 .unwrap();
1763
1764 let err = eventloop.select().await.unwrap_err();
1765 assert!(matches!(err, ConnectionError::MqttState(_)));
1766 assert_eq!(
1767 first_notice.wait_async().await.unwrap_err(),
1768 PublishNoticeError::Qos0NotFlushed
1769 );
1770 assert_eq!(
1771 second_notice.wait_async().await.unwrap_err(),
1772 PublishNoticeError::Qos0NotFlushed
1773 );
1774 }
1775
1776 #[tokio::test]
1777 async fn drain_pending_as_failed_drains_all_and_returns_count() {
1778 let options = MqttOptions::new("test-client", "localhost");
1779 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1780 let (notice_tx, notice) = PublishNoticeTx::new();
1781 let publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1782 eventloop
1783 .pending
1784 .push_back(RequestEnvelope::tracked_publish(publish, notice_tx));
1785 eventloop
1786 .pending
1787 .push_back(RequestEnvelope::plain(Request::PingReq(PingReq)));
1788
1789 let drained = eventloop.drain_pending_as_failed(NoticeFailureReason::SessionReset);
1790
1791 assert_eq!(drained, 2);
1792 assert!(eventloop.pending.is_empty());
1793 assert_eq!(
1794 notice.wait_async().await.unwrap_err(),
1795 PublishNoticeError::SessionReset
1796 );
1797 }
1798
1799 #[tokio::test]
1800 async fn drain_pending_as_failed_reports_session_reset_for_tracked_notices() {
1801 let options = MqttOptions::new("test-client", "localhost");
1802 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1803 let (publish_notice_tx, publish_notice) = PublishNoticeTx::new();
1804 let publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1805 eventloop
1806 .pending
1807 .push_back(RequestEnvelope::tracked_publish(publish, publish_notice_tx));
1808
1809 let (request_notice_tx, request_notice) = SubscribeNoticeTx::new();
1810 let subscribe = Subscribe::new("hello/world", crate::mqttbytes::QoS::AtMostOnce);
1811 eventloop
1812 .pending
1813 .push_back(RequestEnvelope::tracked_subscribe(
1814 subscribe,
1815 request_notice_tx,
1816 ));
1817
1818 eventloop.drain_pending_as_failed(NoticeFailureReason::SessionReset);
1819
1820 assert_eq!(
1821 publish_notice.wait_async().await.unwrap_err(),
1822 PublishNoticeError::SessionReset
1823 );
1824 assert_eq!(
1825 request_notice.wait_async().await.unwrap_err(),
1826 crate::SubscribeNoticeError::SessionReset
1827 );
1828 }
1829
1830 #[tokio::test]
1831 async fn reset_session_state_reports_session_reset_for_pending_tracked_notice() {
1832 let options = MqttOptions::new("test-client", "localhost");
1833 let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1834 let (notice_tx, notice) = PublishNoticeTx::new();
1835 let publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1836 eventloop
1837 .pending
1838 .push_back(RequestEnvelope::tracked_publish(publish, notice_tx));
1839
1840 eventloop.reset_session_state();
1841
1842 assert!(eventloop.pending.is_empty());
1843 assert_eq!(
1844 notice.wait_async().await.unwrap_err(),
1845 PublishNoticeError::SessionReset
1846 );
1847 }
1848
1849 #[test]
1850 fn connack_reconcile_rejects_clean_session_with_session_present() {
1851 let mut eventloop = build_eventloop_with_pending(true);
1852
1853 let err = eventloop.reconcile_connack_session(true).unwrap_err();
1854
1855 assert!(matches!(
1856 err,
1857 ConnectionError::SessionStateMismatch {
1858 clean_session: true,
1859 session_present: true
1860 }
1861 ));
1862 assert_eq!(eventloop.pending_len(), 1);
1863 }
1864
1865 #[test]
1866 fn connack_reconcile_resets_pending_when_clean_session_gets_new_session() {
1867 let mut eventloop = build_eventloop_with_pending(true);
1868
1869 eventloop.reconcile_connack_session(false).unwrap();
1870
1871 assert!(eventloop.pending_is_empty());
1872 }
1873
1874 #[test]
1875 fn connack_reconcile_resets_pending_when_resumed_session_is_missing() {
1876 let mut eventloop = build_eventloop_with_pending(false);
1877
1878 eventloop.reconcile_connack_session(false).unwrap();
1879
1880 assert!(eventloop.pending_is_empty());
1881 }
1882
1883 #[test]
1884 fn connack_reconcile_keeps_pending_when_resumed_session_exists() {
1885 let mut eventloop = build_eventloop_with_pending(false);
1886
1887 eventloop.reconcile_connack_session(true).unwrap();
1888
1889 assert_eq!(eventloop.pending_len(), 1);
1890 }
1891}