Skip to main content

rumqttc/
eventloop.rs

1use crate::notice::{
2    PublishNoticeTx, PublishResult, SubscribeNoticeTx, TrackedNoticeTx, UnsubscribeNoticeTx,
3};
4use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
5use crate::{MqttOptions, Outgoing};
6use crate::{NoticeFailureReason, PublishNoticeError};
7use crate::{Transport, framed::Network};
8
9use crate::framed::AsyncReadWrite;
10use crate::mqttbytes::v4::{
11    ConnAck, Connect, ConnectReturnCode, PingReq, Publish, Subscribe, Unsubscribe,
12};
13use flume::{Receiver, Sender, TryRecvError, bounded, unbounded};
14use rumqttc_core::{OutboundScheduler, RequestClass, RequestReadiness, ScheduledRequest};
15use tokio::select;
16use tokio::time::{self, Instant, Sleep};
17
18use std::collections::VecDeque;
19use std::io;
20use std::pin::Pin;
21use std::time::Duration;
22
23#[cfg(unix)]
24use {std::path::Path, tokio::net::UnixStream};
25
26#[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
27use crate::tls;
28
29#[cfg(feature = "websocket")]
30use {
31    crate::websockets::WsAdapter,
32    crate::websockets::{UrlError, split_url, validate_response_headers},
33    async_tungstenite::tungstenite::client::IntoClientRequest,
34};
35
36#[cfg(feature = "proxy")]
37use crate::proxy::ProxyError;
38
39#[derive(Debug)]
40pub struct RequestEnvelope {
41    request: Request,
42    notice: Option<TrackedNoticeTx>,
43}
44
45impl RequestEnvelope {
46    pub(crate) const fn from_parts(request: Request, notice: Option<TrackedNoticeTx>) -> Self {
47        Self { request, notice }
48    }
49
50    pub(crate) const fn plain(request: Request) -> Self {
51        Self {
52            request,
53            notice: None,
54        }
55    }
56
57    pub(crate) const fn tracked_publish(publish: Publish, notice: PublishNoticeTx) -> Self {
58        Self {
59            request: Request::Publish(publish),
60            notice: Some(TrackedNoticeTx::Publish(notice)),
61        }
62    }
63
64    pub(crate) const fn tracked_subscribe(subscribe: Subscribe, notice: SubscribeNoticeTx) -> Self {
65        Self {
66            request: Request::Subscribe(subscribe),
67            notice: Some(TrackedNoticeTx::Subscribe(notice)),
68        }
69    }
70
71    pub(crate) const fn tracked_unsubscribe(
72        unsubscribe: Unsubscribe,
73        notice: UnsubscribeNoticeTx,
74    ) -> Self {
75        Self {
76            request: Request::Unsubscribe(unsubscribe),
77            notice: Some(TrackedNoticeTx::Unsubscribe(notice)),
78        }
79    }
80
81    pub(crate) fn into_parts(self) -> (Request, Option<TrackedNoticeTx>) {
82        (self.request, self.notice)
83    }
84}
85
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum RequestChannelCapacity {
88    Bounded(usize),
89    Unbounded,
90}
91
92struct PendingDisconnect {
93    deadline: Option<Instant>,
94}
95
96impl PendingDisconnect {
97    fn new(timeout: Option<Duration>) -> Self {
98        Self {
99            deadline: timeout.map(|timeout| Instant::now() + timeout),
100        }
101    }
102}
103
104/// Critical errors during eventloop polling
105#[derive(Debug, thiserror::Error)]
106pub enum ConnectionError {
107    #[error("Mqtt state: {0}")]
108    MqttState(#[from] StateError),
109    #[error("Network timeout")]
110    NetworkTimeout,
111    #[error("Flush timeout")]
112    FlushTimeout,
113    #[error("Graceful disconnect timed out before outbound protocol state drained")]
114    DisconnectTimeout,
115    #[cfg(feature = "websocket")]
116    #[error("Websocket: {0}")]
117    Websocket(#[from] async_tungstenite::tungstenite::error::Error),
118    #[cfg(feature = "websocket")]
119    #[error("Websocket Connect: {0}")]
120    WsConnect(#[from] http::Error),
121    #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
122    #[error("TLS: {0}")]
123    Tls(#[from] tls::Error),
124    #[error("I/O: {0}")]
125    Io(#[from] io::Error),
126    #[error("Connection refused, return code: `{0:?}`")]
127    ConnectionRefused(ConnectReturnCode),
128    #[error("Expected ConnAck packet, received: {0:?}")]
129    NotConnAck(Box<Packet>),
130    #[error(
131        "Broker replied with session_present={session_present} for clean_session={clean_session}"
132    )]
133    SessionStateMismatch {
134        clean_session: bool,
135        session_present: bool,
136    },
137    #[error("Broker target is incompatible with the selected transport")]
138    BrokerTransportMismatch,
139    /// All request senders have been dropped. Use `AsyncClient::disconnect` for MQTT-level
140    /// graceful shutdown with a DISCONNECT packet.
141    #[error("Requests done")]
142    RequestsDone,
143    #[cfg(feature = "websocket")]
144    #[error("Invalid Url: {0}")]
145    InvalidUrl(#[from] UrlError),
146    #[cfg(feature = "proxy")]
147    #[error("Proxy Connect: {0}")]
148    Proxy(#[from] ProxyError),
149    #[cfg(feature = "websocket")]
150    #[error("Websocket response validation error: ")]
151    ResponseValidation(#[from] crate::websockets::ValidationError),
152    #[cfg(feature = "websocket")]
153    #[error("Websocket request modifier failed: {0}")]
154    RequestModifier(#[source] Box<dyn std::error::Error + Send + Sync>),
155}
156
157/// Eventloop with all the state of a connection
158pub struct EventLoop {
159    /// Options of the current mqtt connection
160    pub mqtt_options: MqttOptions,
161    /// Current state of the connection
162    pub state: MqttState,
163    /// Flow-controlled publish request stream.
164    requests_rx: Receiver<RequestEnvelope>,
165    /// Control request stream.
166    control_requests_rx: Receiver<RequestEnvelope>,
167    /// Narrow immediate shutdown stream.
168    immediate_disconnect_rx: Receiver<RequestEnvelope>,
169    /// Internal request sender retained for compatibility in `EventLoop::new`.
170    /// This is intentionally dropped in the client builder path.
171    _requests_tx: Option<Sender<RequestEnvelope>>,
172    _control_requests_tx: Option<Sender<RequestEnvelope>>,
173    _immediate_disconnect_tx: Option<Sender<RequestEnvelope>>,
174    /// Pending packets from last session
175    pending: VecDeque<RequestEnvelope>,
176    /// Requests admitted by the event loop and waiting for protocol scheduling.
177    queued: OutboundScheduler<RequestEnvelope>,
178    /// Network connection to the broker
179    pub network: Option<Network>,
180    /// Keep alive time
181    keepalive_timeout: Option<Pin<Box<Sleep>>>,
182    /// Dummy sleep used as a placeholder in select! when keepalive is disabled.
183    /// Initialized once with `Duration::MAX` so that it never fires.
184    no_sleep: Option<Pin<Box<Sleep>>>,
185    pub network_options: NetworkOptions,
186    pending_disconnect: Option<PendingDisconnect>,
187    disconnect_complete: bool,
188}
189
190/// Events which can be yielded by the event loop
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub enum Event {
193    Incoming(Incoming),
194    Outgoing(Outgoing),
195}
196
197impl EventLoop {
198    fn reconcile_connack_session(&mut self, session_present: bool) -> Result<(), ConnectionError> {
199        let clean_session = self.mqtt_options.clean_session();
200        if clean_session && session_present {
201            return Err(ConnectionError::SessionStateMismatch {
202                clean_session,
203                session_present,
204            });
205        }
206
207        if !session_present {
208            self.reset_session_state();
209        }
210
211        Ok(())
212    }
213
214    /// New MQTT `EventLoop`
215    ///
216    /// When connection encounters critical errors (like auth failure), user has a choice to
217    /// access and update `options`, `state` and `requests`.
218    pub fn new(mqtt_options: MqttOptions, cap: usize) -> Self {
219        let (requests_tx, requests_rx) = bounded(cap);
220        let (control_requests_tx, control_requests_rx) = bounded(cap);
221        let (immediate_disconnect_tx, immediate_disconnect_rx) = unbounded();
222        Self::with_channel(
223            mqtt_options,
224            requests_rx,
225            control_requests_rx,
226            immediate_disconnect_rx,
227            Some(requests_tx),
228            Some(control_requests_tx),
229            Some(immediate_disconnect_tx),
230        )
231    }
232
233    /// Internal constructor used by client builders.
234    ///
235    /// Unlike `EventLoop::new`, this does not keep an internal sender handle, so dropping all
236    /// `AsyncClient` handles can terminate polling with `ConnectionError::RequestsDone`.
237    pub(crate) fn new_for_async_client_with_capacity(
238        mqtt_options: MqttOptions,
239        capacity: RequestChannelCapacity,
240    ) -> (
241        Self,
242        Sender<RequestEnvelope>,
243        Sender<RequestEnvelope>,
244        Sender<RequestEnvelope>,
245    ) {
246        let (requests_tx, requests_rx) = match capacity {
247            RequestChannelCapacity::Bounded(cap) => bounded(cap),
248            RequestChannelCapacity::Unbounded => unbounded(),
249        };
250        let (control_requests_tx, control_requests_rx) = match capacity {
251            RequestChannelCapacity::Bounded(cap) => bounded(cap),
252            RequestChannelCapacity::Unbounded => unbounded(),
253        };
254        let (immediate_disconnect_tx, immediate_disconnect_rx) = unbounded();
255        let eventloop = Self::with_channel(
256            mqtt_options,
257            requests_rx,
258            control_requests_rx,
259            immediate_disconnect_rx,
260            None,
261            None,
262            None,
263        );
264        (
265            eventloop,
266            requests_tx,
267            control_requests_tx,
268            immediate_disconnect_tx,
269        )
270    }
271
272    /// Internal bounded constructor retained for tests that already choose a capacity.
273    #[cfg(test)]
274    pub(crate) fn new_for_async_client(
275        mqtt_options: MqttOptions,
276        cap: usize,
277    ) -> (Self, Sender<RequestEnvelope>) {
278        let (eventloop, request_tx, _control_request_tx, _immediate_disconnect_tx) =
279            Self::new_for_async_client_with_capacity(
280                mqtt_options,
281                RequestChannelCapacity::Bounded(cap),
282            );
283        (eventloop, request_tx)
284    }
285
286    fn with_channel(
287        mqtt_options: MqttOptions,
288        requests_rx: Receiver<RequestEnvelope>,
289        control_requests_rx: Receiver<RequestEnvelope>,
290        immediate_disconnect_rx: Receiver<RequestEnvelope>,
291        requests_tx: Option<Sender<RequestEnvelope>>,
292        control_requests_tx: Option<Sender<RequestEnvelope>>,
293        immediate_disconnect_tx: Option<Sender<RequestEnvelope>>,
294    ) -> Self {
295        let pending = VecDeque::new();
296        let max_inflight = mqtt_options.inflight;
297        let manual_acks = mqtt_options.manual_acks;
298
299        Self {
300            mqtt_options,
301            state: MqttState::new_internal(max_inflight, manual_acks),
302            requests_rx,
303            control_requests_rx,
304            immediate_disconnect_rx,
305            _requests_tx: requests_tx,
306            _control_requests_tx: control_requests_tx,
307            _immediate_disconnect_tx: immediate_disconnect_tx,
308            pending,
309            queued: OutboundScheduler::default(),
310            network: None,
311            keepalive_timeout: None,
312            no_sleep: None,
313            network_options: NetworkOptions::new(),
314            pending_disconnect: None,
315            disconnect_complete: false,
316        }
317    }
318
319    /// Last session might contain packets which aren't acked. MQTT says these packets should be
320    /// republished in the next session. Move pending messages from state to eventloop, drops the
321    /// underlying network connection and clears the keepalive timeout if any.
322    ///
323    /// > NOTE: Use only when EventLoop is blocked on network and unable to immediately handle disconnect.
324    /// > Pending requests are managed internally by the event loop.
325    /// > Use [`pending_len`](Self::pending_len) or [`pending_is_empty`](Self::pending_is_empty)
326    /// > for observation-only checks.
327    pub fn clean(&mut self) {
328        self.network = None;
329        self.keepalive_timeout = None;
330        self.pending_disconnect = None;
331        for (request, notice) in self.state.clean_with_notices() {
332            self.pending
333                .push_back(RequestEnvelope::from_parts(request, notice));
334        }
335
336        for envelope in self.queued.drain() {
337            if should_replay_after_reconnect(&envelope.request) {
338                self.pending.push_back(envelope);
339            }
340        }
341
342        // drain requests from channel which weren't yet received
343        for envelope in self.requests_rx.drain() {
344            // Wait for publish retransmission, else the broker could be confused by an unexpected
345            // inbound acknowledgment replayed from a previous connection.
346            if should_replay_after_reconnect(&envelope.request) {
347                self.pending.push_back(envelope);
348            }
349        }
350
351        for envelope in self.control_requests_rx.drain() {
352            // Wait for publish retransmission, else the broker could be confused by an unexpected
353            // inbound acknowledgment replayed from a previous connection.
354            if should_replay_after_reconnect(&envelope.request) {
355                self.pending.push_back(envelope);
356            }
357        }
358    }
359
360    /// Number of pending requests queued for retransmission.
361    pub fn pending_len(&self) -> usize {
362        self.pending.len() + self.queued.len()
363    }
364
365    /// Returns true when there are no pending requests queued for retransmission.
366    pub fn pending_is_empty(&self) -> bool {
367        self.pending.is_empty() && self.queued.is_empty()
368    }
369
370    /// Drains pending retransmission queue and fails tracked notices with the given reason.
371    ///
372    /// Returns the number of pending requests removed from the queue.
373    pub fn drain_pending_as_failed(&mut self, reason: NoticeFailureReason) -> usize {
374        let mut drained = 0;
375        for envelope in self.pending.drain(..) {
376            drained += 1;
377            if let Some(notice) = envelope.notice {
378                match notice {
379                    TrackedNoticeTx::Publish(notice) => {
380                        notice.error(reason.publish_error());
381                    }
382                    TrackedNoticeTx::Subscribe(notice) => {
383                        notice.error(reason.subscribe_error());
384                    }
385                    TrackedNoticeTx::Unsubscribe(notice) => {
386                        notice.error(reason.unsubscribe_error());
387                    }
388                }
389            }
390        }
391        for envelope in self.queued.drain() {
392            drained += 1;
393            if let Some(notice) = envelope.notice {
394                match notice {
395                    TrackedNoticeTx::Publish(notice) => {
396                        notice.error(reason.publish_error());
397                    }
398                    TrackedNoticeTx::Subscribe(notice) => {
399                        notice.error(reason.subscribe_error());
400                    }
401                    TrackedNoticeTx::Unsubscribe(notice) => {
402                        notice.error(reason.unsubscribe_error());
403                    }
404                }
405            }
406        }
407
408        drained
409    }
410
411    /// Clears eventloop and state tracking bound to a previous session.
412    pub fn reset_session_state(&mut self) {
413        self.drain_pending_as_failed(NoticeFailureReason::SessionReset);
414        self.state.fail_pending_notices();
415    }
416
417    /// Yields Next notification or outgoing request and periodically pings
418    /// the broker. Continuing to poll will reconnect to the broker if there is
419    /// a disconnection.
420    /// **NOTE** Don't block this while iterating
421    ///
422    /// # Errors
423    ///
424    /// Returns a [`ConnectionError`] if connecting, reading, writing, or
425    /// protocol handling fails.
426    pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
427        if self.disconnect_complete {
428            return Err(ConnectionError::RequestsDone);
429        }
430
431        if self.network.is_none() {
432            if let Ok(envelope) = self.immediate_disconnect_rx.try_recv() {
433                self.disconnect_complete = true;
434                if let Some(notice) = envelope.notice {
435                    drop(notice);
436                }
437                return Err(ConnectionError::RequestsDone);
438            }
439
440            let (network, connack) = match time::timeout(
441                Duration::from_secs(self.network_options.connection_timeout()),
442                connect(&self.mqtt_options, self.network_options.clone()),
443            )
444            .await
445            {
446                Ok(inner) => inner?,
447                Err(_) => return Err(ConnectionError::NetworkTimeout),
448            };
449            self.reconcile_connack_session(connack.session_present)?;
450            self.network = Some(network);
451
452            if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
453                self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
454            }
455
456            return Ok(Event::Incoming(Packet::ConnAck(connack)));
457        }
458
459        match self.select().await {
460            Ok(v) => Ok(v),
461            Err(ConnectionError::DisconnectTimeout) => {
462                self.network = None;
463                self.keepalive_timeout = None;
464                self.pending_disconnect = None;
465                self.disconnect_complete = true;
466                Err(ConnectionError::DisconnectTimeout)
467            }
468            Err(e) => {
469                // MQTT requires that packets pending acknowledgement should be republished on session resume.
470                // Move pending messages from state to eventloop.
471                self.clean();
472                Err(e)
473            }
474        }
475    }
476
477    /// Select on network and requests and generate keepalive pings when necessary
478    #[allow(clippy::too_many_lines)]
479    async fn select(&mut self) -> Result<Event, ConnectionError> {
480        loop {
481            if let Some(event) = self.state.events.pop_front() {
482                return Ok(event);
483            }
484
485            if let Ok(envelope) = self.immediate_disconnect_rx.try_recv() {
486                return self.handle_immediate_disconnect(envelope).await;
487            }
488
489            if self.queued.is_empty()
490                && self.pending.is_empty()
491                && self.pending_disconnect.is_none()
492                && self.requests_rx.is_disconnected()
493                && self.requests_rx.is_empty()
494                && self.control_requests_rx.is_disconnected()
495                && self.control_requests_rx.is_empty()
496                && self.state.outbound_requests_drained()
497            {
498                return Err(ConnectionError::RequestsDone);
499            }
500
501            if self.handle_ready_requests().await? {
502                if let Some(event) = self.state.events.pop_front() {
503                    return Ok(event);
504                }
505                continue;
506            }
507
508            if self.pending_disconnect.is_some() {
509                if self.queued.is_empty() && self.state.outbound_requests_drained() {
510                    return self.send_pending_disconnect().await;
511                }
512
513                if let Some(event) = self.poll_disconnect_drain().await? {
514                    return Ok(event);
515                }
516                continue;
517            }
518
519            let read_batch_size = self.effective_read_batch_size();
520            let normal_request_admission_allowed =
521                self.normal_request_admission_allowed() || !self.pending.is_empty();
522            let no_sleep = self
523                .no_sleep
524                .get_or_insert_with(|| Box::pin(time::sleep(Duration::MAX)));
525
526            return select! {
527                biased;
528                o = self.immediate_disconnect_rx.recv_async(), if !self.immediate_disconnect_rx.is_disconnected() => match o {
529                    Ok(envelope) => self.handle_immediate_disconnect(envelope).await,
530                    Err(_) => continue,
531                },
532                o = self.control_requests_rx.recv_async(),
533                    if self.pending_disconnect.is_none()
534                        && (!self.control_requests_rx.is_empty()
535                            || !self.control_requests_rx.is_disconnected()) => match o {
536                    Ok(envelope) => {
537                        self.try_admit_existing_normal_requests().await;
538                        self.queued.push_back(envelope);
539                        continue;
540                    }
541                    Err(_) => continue,
542                },
543                o = Self::next_request(
544                    &mut self.pending,
545                    &self.requests_rx,
546                    self.mqtt_options.pending_throttle
547                ), if self.pending_disconnect.is_none()
548                    && normal_request_admission_allowed
549                    && (!self.pending.is_empty()
550                        || !self.requests_rx.is_empty()
551                        || !self.requests_rx.is_disconnected()) => match o {
552                    Ok((request, notice)) => {
553                        self.admit_normal_request_batch((request, notice)).await;
554                        continue;
555                    }
556                    Err(_) => continue,
557                },
558                o = self.network.as_mut().unwrap().readb(&mut self.state, read_batch_size) => {
559                    o?;
560                    self.flush_network().await?;
561                    Ok(self.state.events.pop_front().unwrap())
562                },
563                () = self.keepalive_timeout.as_mut().unwrap_or(no_sleep),
564                    if self.keepalive_timeout.is_some() && !self.mqtt_options.keep_alive.is_zero() => {
565                    let timeout = self.keepalive_timeout.as_mut().unwrap();
566                    timeout.as_mut().reset(Instant::now() + self.mqtt_options.keep_alive);
567
568                    let (outgoing, _flush_notice) = self
569                        .state
570                        .handle_outgoing_packet_with_notice(Request::PingReq(PingReq), None)?;
571                    if let Some(outgoing) = outgoing {
572                        self.network.as_mut().unwrap().write(outgoing).await?;
573                    }
574                    self.flush_network().await?;
575                    Ok(self.state.events.pop_front().unwrap())
576                }
577            };
578        }
579    }
580
581    async fn handle_immediate_disconnect(
582        &mut self,
583        envelope: RequestEnvelope,
584    ) -> Result<Event, ConnectionError> {
585        let (request, notice) = envelope.into_parts();
586        let mut should_flush = false;
587        let mut qos0_notices = Vec::new();
588        self.handle_request(request, notice, &mut should_flush, &mut qos0_notices)
589            .await?;
590        self.flush_request_batch(should_flush, qos0_notices).await?;
591        Ok(self.state.events.pop_front().unwrap())
592    }
593
594    async fn handle_ready_requests(&mut self) -> Result<bool, ConnectionError> {
595        let Some((request, notice)) = self.next_scheduled_request() else {
596            return Ok(false);
597        };
598
599        let mut should_flush = false;
600        let mut qos0_notices = Vec::new();
601        if self
602            .handle_request(request, notice, &mut should_flush, &mut qos0_notices)
603            .await?
604        {
605            for _ in 1..self.mqtt_options.max_request_batch.max(1) {
606                let Some((next_request, next_notice)) = self.next_scheduled_request() else {
607                    break;
608                };
609
610                if !self
611                    .handle_request(
612                        next_request,
613                        next_notice,
614                        &mut should_flush,
615                        &mut qos0_notices,
616                    )
617                    .await?
618                {
619                    break;
620                }
621            }
622        }
623
624        self.flush_request_batch(should_flush, qos0_notices).await?;
625        Ok(true)
626    }
627
628    fn next_scheduled_request(&mut self) -> Option<(Request, Option<TrackedNoticeTx>)> {
629        let state = &self.state;
630        self.queued
631            .pop_next(|envelope| classify_request(state, &envelope.request))
632            .map(RequestEnvelope::into_parts)
633    }
634
635    fn normal_request_admission_allowed(&self) -> bool {
636        self.queued.is_empty()
637            || self
638                .queued
639                .has_ready(|envelope| classify_request(&self.state, &envelope.request))
640    }
641
642    async fn try_admit_existing_normal_requests(&mut self) {
643        let ready = self.pending.len() + self.requests_rx.len();
644        for _ in 0..ready {
645            if !(self.normal_request_admission_allowed() || !self.pending.is_empty()) {
646                break;
647            }
648
649            let Some((request, notice)) = Self::try_next_request(
650                &mut self.pending,
651                &self.requests_rx,
652                self.mqtt_options.pending_throttle,
653            )
654            .await
655            else {
656                break;
657            };
658
659            let stop_batch = is_disconnect_request(&request);
660            self.queued
661                .push_back(RequestEnvelope::from_parts(request, notice));
662            if stop_batch {
663                break;
664            }
665        }
666    }
667
668    async fn admit_normal_request_batch(&mut self, first: (Request, Option<TrackedNoticeTx>)) {
669        let (request, notice) = first;
670        let stop_batch = is_disconnect_request(&request);
671        self.queued
672            .push_back(RequestEnvelope::from_parts(request, notice));
673        if stop_batch || !(self.normal_request_admission_allowed() || !self.pending.is_empty()) {
674            return;
675        }
676
677        for _ in 1..self.mqtt_options.max_request_batch.max(1) {
678            let Some((request, notice)) = Self::try_next_request(
679                &mut self.pending,
680                &self.requests_rx,
681                self.mqtt_options.pending_throttle,
682            )
683            .await
684            else {
685                break;
686            };
687            let stop_batch = is_disconnect_request(&request);
688            self.queued
689                .push_back(RequestEnvelope::from_parts(request, notice));
690            if stop_batch {
691                break;
692            }
693        }
694    }
695
696    async fn handle_request(
697        &mut self,
698        request: Request,
699        notice: Option<TrackedNoticeTx>,
700        should_flush: &mut bool,
701        qos0_notices: &mut Vec<PublishNoticeTx>,
702    ) -> Result<bool, ConnectionError> {
703        match request {
704            Request::Disconnect(_) => {
705                self.pending_disconnect = Some(PendingDisconnect::new(None));
706                Ok(false)
707            }
708            Request::DisconnectWithTimeout(_, timeout) => {
709                self.pending_disconnect = Some(PendingDisconnect::new(Some(timeout)));
710                Ok(false)
711            }
712            Request::DisconnectNow(_) => {
713                let (outgoing, _) = self
714                    .state
715                    .handle_outgoing_packet_with_notice(request, notice)?;
716                if let Some(outgoing) = outgoing {
717                    if let Err(err) = self.network.as_mut().unwrap().write(outgoing).await {
718                        return Err(ConnectionError::MqttState(err));
719                    }
720                    *should_flush = true;
721                }
722                self.disconnect_complete = true;
723                Ok(false)
724            }
725            request => {
726                let (outgoing, flush_notice) = self
727                    .state
728                    .handle_outgoing_packet_with_notice(request, notice)?;
729                if let Some(notice) = flush_notice {
730                    qos0_notices.push(notice);
731                }
732                if let Some(outgoing) = outgoing {
733                    if let Err(err) = self.network.as_mut().unwrap().write(outgoing).await {
734                        for notice in qos0_notices.drain(..) {
735                            notice.error(PublishNoticeError::Qos0NotFlushed);
736                        }
737                        return Err(ConnectionError::MqttState(err));
738                    }
739                    *should_flush = true;
740                }
741                Ok(true)
742            }
743        }
744    }
745
746    async fn flush_request_batch(
747        &mut self,
748        should_flush: bool,
749        qos0_notices: Vec<PublishNoticeTx>,
750    ) -> Result<(), ConnectionError> {
751        if !should_flush {
752            return Ok(());
753        }
754
755        match time::timeout(
756            Duration::from_secs(self.network_options.connection_timeout()),
757            self.network.as_mut().unwrap().flush(),
758        )
759        .await
760        {
761            Ok(Ok(())) => {
762                for notice in qos0_notices {
763                    notice.success(PublishResult::Qos0Flushed);
764                }
765                Ok(())
766            }
767            Ok(Err(err)) => {
768                for notice in qos0_notices {
769                    notice.error(PublishNoticeError::Qos0NotFlushed);
770                }
771                Err(ConnectionError::MqttState(err))
772            }
773            Err(_) => {
774                for notice in qos0_notices {
775                    notice.error(PublishNoticeError::Qos0NotFlushed);
776                }
777                Err(ConnectionError::FlushTimeout)
778            }
779        }
780    }
781
782    async fn flush_network(&mut self) -> Result<(), ConnectionError> {
783        time::timeout(
784            Duration::from_secs(self.network_options.connection_timeout()),
785            self.network.as_mut().unwrap().flush(),
786        )
787        .await
788        .map_or_else(
789            |_| Err(ConnectionError::FlushTimeout),
790            |inner| inner.map_err(ConnectionError::MqttState),
791        )
792    }
793
794    async fn poll_disconnect_drain(&mut self) -> Result<Option<Event>, ConnectionError> {
795        let read_batch_size = self.effective_read_batch_size();
796        let read = self
797            .network
798            .as_mut()
799            .unwrap()
800            .readb(&mut self.state, read_batch_size);
801
802        if let Some(deadline) = self
803            .pending_disconnect
804            .as_ref()
805            .and_then(|pending| pending.deadline)
806        {
807            select! {
808                o = self.immediate_disconnect_rx.recv_async(), if !self.immediate_disconnect_rx.is_disconnected() => match o {
809                    Ok(envelope) => return self.handle_immediate_disconnect(envelope).await.map(Some),
810                    Err(_) => return Ok(None),
811                },
812                result = read => result?,
813                () = time::sleep_until(deadline) => return Err(ConnectionError::DisconnectTimeout),
814            }
815        } else {
816            select! {
817                o = self.immediate_disconnect_rx.recv_async(), if !self.immediate_disconnect_rx.is_disconnected() => match o {
818                    Ok(envelope) => return self.handle_immediate_disconnect(envelope).await.map(Some),
819                    Err(_) => return Ok(None),
820                },
821                result = read => result?,
822            }
823        }
824
825        self.flush_network().await?;
826        Ok(None)
827    }
828
829    async fn send_pending_disconnect(&mut self) -> Result<Event, ConnectionError> {
830        self.pending_disconnect = None;
831        let (outgoing, _) = self
832            .state
833            .handle_outgoing_packet_with_notice(Request::DisconnectNow(crate::Disconnect), None)?;
834
835        if let Some(outgoing) = outgoing {
836            self.network.as_mut().unwrap().write(outgoing).await?;
837            self.flush_network().await?;
838        }
839
840        self.disconnect_complete = true;
841        Ok(self.state.events.pop_front().unwrap())
842    }
843
844    pub fn network_options(&self) -> NetworkOptions {
845        self.network_options.clone()
846    }
847
848    pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
849        self.network_options = network_options;
850        self
851    }
852
853    fn effective_read_batch_size(&self) -> usize {
854        const MAX_READ_BATCH_SIZE: usize = 128;
855        const PENDING_FAIRNESS_CAP: usize = 16;
856
857        let configured = self.mqtt_options.read_batch_size();
858        if configured > 0 {
859            return configured.clamp(1, MAX_READ_BATCH_SIZE);
860        }
861
862        let request_batch = self.mqtt_options.max_request_batch().max(1);
863        let inflight = usize::from(self.mqtt_options.inflight());
864        let mut adaptive = request_batch.max(inflight / 2).max(8);
865
866        if !self.pending.is_empty()
867            || !self.queued.is_empty()
868            || !self.requests_rx.is_empty()
869            || !self.control_requests_rx.is_empty()
870        {
871            adaptive = adaptive.min(PENDING_FAIRNESS_CAP);
872        }
873
874        adaptive.clamp(1, MAX_READ_BATCH_SIZE)
875    }
876
877    async fn try_next_request(
878        pending: &mut VecDeque<RequestEnvelope>,
879        rx: &Receiver<RequestEnvelope>,
880        pending_throttle: Duration,
881    ) -> Option<(Request, Option<TrackedNoticeTx>)> {
882        if !pending.is_empty() {
883            if pending_throttle.is_zero() {
884                tokio::task::yield_now().await;
885            } else {
886                time::sleep(pending_throttle).await;
887            }
888            // We must call .pop_front() AFTER sleep() otherwise we would have
889            // advanced the iterator but the future might be canceled before return
890            return pending.pop_front().map(RequestEnvelope::into_parts);
891        }
892
893        match rx.try_recv() {
894            Ok(envelope) => return Some(envelope.into_parts()),
895            Err(TryRecvError::Disconnected) => return None,
896            Err(TryRecvError::Empty) => {}
897        }
898
899        None
900    }
901
902    async fn next_request(
903        pending: &mut VecDeque<RequestEnvelope>,
904        rx: &Receiver<RequestEnvelope>,
905        pending_throttle: Duration,
906    ) -> Result<(Request, Option<TrackedNoticeTx>), ConnectionError> {
907        if pending.is_empty() {
908            rx.recv_async()
909                .await
910                .map(RequestEnvelope::into_parts)
911                .map_err(|_| ConnectionError::RequestsDone)
912        } else {
913            if pending_throttle.is_zero() {
914                tokio::task::yield_now().await;
915            } else {
916                time::sleep(pending_throttle).await;
917            }
918            // We must call .pop_front() AFTER sleep() otherwise we would have
919            // advanced the iterator but the future might be canceled before return
920            Ok(pending.pop_front().unwrap().into_parts())
921        }
922    }
923}
924
925fn classify_request(state: &MqttState, request: &Request) -> ScheduledRequest {
926    match request {
927        Request::Publish(publish) if publish.qos != crate::mqttbytes::QoS::AtMostOnce => {
928            ScheduledRequest {
929                class: RequestClass::FlowControlledPublish,
930                readiness: if state.can_send_publish(publish) {
931                    RequestReadiness::Ready
932                } else {
933                    RequestReadiness::Blocked
934                },
935            }
936        }
937        Request::Publish(_) => ScheduledRequest {
938            class: RequestClass::Publish,
939            readiness: RequestReadiness::Ready,
940        },
941        Request::Subscribe(_) | Request::Unsubscribe(_) => ScheduledRequest {
942            class: RequestClass::Control,
943            readiness: if state.control_packet_identifier_available() {
944                RequestReadiness::Ready
945            } else {
946                RequestReadiness::Blocked
947            },
948        },
949        _ => ScheduledRequest {
950            class: RequestClass::Control,
951            readiness: RequestReadiness::Ready,
952        },
953    }
954}
955
956const fn is_disconnect_request(request: &Request) -> bool {
957    matches!(
958        request,
959        Request::Disconnect(_) | Request::DisconnectWithTimeout(_, _) | Request::DisconnectNow(_)
960    )
961}
962
963/// This stream internally processes requests from the request stream provided to the eventloop
964/// while also consuming byte stream from the network and yielding mqtt packets as the output of
965/// the stream.
966/// This function (for convenience) includes internal delays for users to perform internal sleeps
967/// between re-connections so that cancel semantics can be used during this sleep
968async fn connect(
969    mqtt_options: &MqttOptions,
970    network_options: NetworkOptions,
971) -> Result<(Network, ConnAck), ConnectionError> {
972    // connect to the broker
973    let mut network = network_connect(mqtt_options, network_options).await?;
974
975    // make MQTT connection request (which internally awaits for ack)
976    let connack = mqtt_connect(mqtt_options, &mut network).await?;
977
978    Ok((network, connack))
979}
980
981#[allow(clippy::too_many_lines)]
982async fn network_connect(
983    options: &MqttOptions,
984    network_options: NetworkOptions,
985) -> Result<Network, ConnectionError> {
986    let transport = options.transport();
987
988    // Process Unix files early, as proxy is not supported for them.
989    #[cfg(unix)]
990    if matches!(&transport, Transport::Unix) {
991        let file = options
992            .broker()
993            .unix_path()
994            .ok_or(ConnectionError::BrokerTransportMismatch)?;
995        let socket = UnixStream::connect(Path::new(file)).await?;
996        let network = Network::new(
997            socket,
998            options.max_incoming_packet_size,
999            options.max_outgoing_packet_size,
1000        );
1001        return Ok(network);
1002    }
1003
1004    // For websockets domain and port are taken directly from the broker URL.
1005    let (domain, port) = match &transport {
1006        #[cfg(feature = "websocket")]
1007        Transport::Ws => split_url(
1008            options
1009                .broker()
1010                .websocket_url()
1011                .ok_or(ConnectionError::BrokerTransportMismatch)?,
1012        )?,
1013        #[cfg(all(
1014            any(feature = "use-rustls-no-provider", feature = "use-native-tls"),
1015            feature = "websocket"
1016        ))]
1017        Transport::Wss(_) => split_url(
1018            options
1019                .broker()
1020                .websocket_url()
1021                .ok_or(ConnectionError::BrokerTransportMismatch)?,
1022        )?,
1023        _ => options
1024            .broker()
1025            .tcp_address()
1026            .map(|(host, port)| (host.to_owned(), port))
1027            .ok_or(ConnectionError::BrokerTransportMismatch)?,
1028    };
1029
1030    let tcp_stream: Box<dyn AsyncReadWrite> = {
1031        #[cfg(feature = "proxy")]
1032        if let Some(proxy) = options.proxy() {
1033            proxy
1034                .connect(
1035                    &domain,
1036                    port,
1037                    network_options,
1038                    Some(options.effective_socket_connector()),
1039                )
1040                .await?
1041        } else {
1042            let addr = format!("{domain}:{port}");
1043            options.socket_connect(addr, network_options).await?
1044        }
1045        #[cfg(not(feature = "proxy"))]
1046        {
1047            let addr = format!("{domain}:{port}");
1048            options.socket_connect(addr, network_options).await?
1049        }
1050    };
1051
1052    let network = match transport {
1053        Transport::Tcp => Network::new(
1054            tcp_stream,
1055            options.max_incoming_packet_size,
1056            options.max_outgoing_packet_size,
1057        ),
1058        #[cfg(any(feature = "use-rustls-no-provider", feature = "use-native-tls"))]
1059        Transport::Tls(tls_config) => {
1060            let (host, port) = options
1061                .broker()
1062                .tcp_address()
1063                .expect("tls transport requires a tcp broker");
1064            let socket = tls::tls_connect(host, port, &tls_config, tcp_stream).await?;
1065            Network::new(
1066                socket,
1067                options.max_incoming_packet_size,
1068                options.max_outgoing_packet_size,
1069            )
1070        }
1071        #[cfg(unix)]
1072        Transport::Unix => unreachable!(),
1073        #[cfg(feature = "websocket")]
1074        Transport::Ws => {
1075            let mut request = options
1076                .broker()
1077                .websocket_url()
1078                .expect("ws transport requires a websocket broker")
1079                .into_client_request()?;
1080            request
1081                .headers_mut()
1082                .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap());
1083
1084            if let Some(request_modifier) = options.fallible_request_modifier() {
1085                request = request_modifier(request)
1086                    .await
1087                    .map_err(ConnectionError::RequestModifier)?;
1088            } else if let Some(request_modifier) = options.request_modifier() {
1089                request = request_modifier(request).await;
1090            }
1091
1092            let (socket, response) =
1093                async_tungstenite::tokio::client_async(request, tcp_stream).await?;
1094            validate_response_headers(response)?;
1095
1096            Network::new(
1097                WsAdapter::new(socket),
1098                options.max_incoming_packet_size,
1099                options.max_outgoing_packet_size,
1100            )
1101        }
1102        #[cfg(all(
1103            any(feature = "use-rustls-no-provider", feature = "use-native-tls"),
1104            feature = "websocket"
1105        ))]
1106        Transport::Wss(tls_config) => {
1107            let mut request = options
1108                .broker()
1109                .websocket_url()
1110                .expect("wss transport requires a websocket broker")
1111                .into_client_request()?;
1112            request
1113                .headers_mut()
1114                .insert("Sec-WebSocket-Protocol", "mqtt".parse().unwrap());
1115
1116            if let Some(request_modifier) = options.fallible_request_modifier() {
1117                request = request_modifier(request)
1118                    .await
1119                    .map_err(ConnectionError::RequestModifier)?;
1120            } else if let Some(request_modifier) = options.request_modifier() {
1121                request = request_modifier(request).await;
1122            }
1123
1124            let tls_stream = tls::tls_connect(&domain, port, &tls_config, tcp_stream).await?;
1125            let (socket, response) =
1126                async_tungstenite::tokio::client_async(request, tls_stream).await?;
1127            validate_response_headers(response)?;
1128
1129            Network::new(
1130                WsAdapter::new(socket),
1131                options.max_incoming_packet_size,
1132                options.max_outgoing_packet_size,
1133            )
1134        }
1135    };
1136
1137    Ok(network)
1138}
1139
1140async fn mqtt_connect(
1141    options: &MqttOptions,
1142    network: &mut Network,
1143) -> Result<ConnAck, ConnectionError> {
1144    let mut connect = Connect::new(options.client_id());
1145    connect.keep_alive = u16::try_from(options.keep_alive().as_secs()).unwrap_or(u16::MAX);
1146    connect.clean_session = options.clean_session();
1147    connect.last_will = options.last_will();
1148    connect.auth = options.auth().clone();
1149
1150    // send mqtt connect packet
1151    network.write(Packet::Connect(connect)).await?;
1152    network.flush().await?;
1153
1154    // validate connack
1155    match network.read().await? {
1156        Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack),
1157        Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
1158        packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
1159    }
1160}
1161
1162const fn should_replay_after_reconnect(request: &Request) -> bool {
1163    !matches!(request, Request::PubAck(_) | Request::PubRec(_))
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169    use crate::mqttbytes::QoS;
1170    use crate::{Disconnect, PubAck, PubComp, PubRec, PubRel};
1171    use flume::TryRecvError;
1172
1173    fn push_pending(eventloop: &mut EventLoop, request: Request) {
1174        eventloop.pending.push_back(RequestEnvelope::plain(request));
1175    }
1176
1177    fn pending_front_request(eventloop: &EventLoop) -> Option<&Request> {
1178        eventloop.pending.front().map(|envelope| &envelope.request)
1179    }
1180
1181    fn build_eventloop_with_pending(clean_session: bool) -> EventLoop {
1182        let mut options = MqttOptions::new("test-client", "localhost");
1183        options.set_clean_session(clean_session);
1184
1185        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1186        push_pending(&mut eventloop, Request::PingReq(PingReq));
1187        eventloop
1188    }
1189
1190    fn publish(qos: QoS) -> Publish {
1191        Publish::new("hello/world", qos, "payload")
1192    }
1193
1194    fn fill_publish_window(eventloop: &mut EventLoop) {
1195        let mut active = publish(QoS::AtLeastOnce);
1196        active.pkid = 1;
1197        eventloop.state.outgoing_pub[1] = Some(active);
1198        eventloop.state.inflight = 1;
1199    }
1200
1201    fn next_after_blocked_publish(request: Request) -> Option<Request> {
1202        let mut options = MqttOptions::new("test-client", "localhost");
1203        options.set_inflight(1);
1204        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1205        fill_publish_window(&mut eventloop);
1206        eventloop
1207            .queued
1208            .push_back(RequestEnvelope::plain(Request::Publish(publish(
1209                QoS::AtLeastOnce,
1210            ))));
1211        eventloop.queued.push_back(RequestEnvelope::plain(request));
1212
1213        eventloop
1214            .next_scheduled_request()
1215            .map(|(request, _notice)| request)
1216    }
1217
1218    #[test]
1219    fn scheduler_sends_control_after_quota_blocked_publish() {
1220        let mut options = MqttOptions::new("test-client", "localhost");
1221        options.set_inflight(1);
1222        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1223        fill_publish_window(&mut eventloop);
1224        eventloop
1225            .queued
1226            .push_back(RequestEnvelope::plain(Request::Publish(publish(
1227                QoS::AtLeastOnce,
1228            ))));
1229        eventloop
1230            .queued
1231            .push_back(RequestEnvelope::plain(Request::Subscribe(Subscribe::new(
1232                "hello/world",
1233                QoS::AtMostOnce,
1234            ))));
1235
1236        let (request, _) = eventloop.next_scheduled_request().unwrap();
1237
1238        assert!(matches!(request, Request::Subscribe(_)));
1239        match eventloop.state.handle_outgoing_packet(request).unwrap() {
1240            Some(Packet::Subscribe(subscribe)) => assert_ne!(subscribe.pkid, 1),
1241            packet => panic!("expected subscribe packet, got {packet:?}"),
1242        }
1243        assert!(matches!(
1244            eventloop
1245                .queued
1246                .drain()
1247                .next()
1248                .map(|envelope| envelope.request),
1249            Some(Request::Publish(_))
1250        ));
1251    }
1252
1253    #[test]
1254    fn scheduler_does_not_let_qos0_publish_pass_blocked_publish() {
1255        let mut options = MqttOptions::new("test-client", "localhost");
1256        options.set_inflight(1);
1257        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1258        fill_publish_window(&mut eventloop);
1259        eventloop
1260            .queued
1261            .push_back(RequestEnvelope::plain(Request::Publish(publish(
1262                QoS::AtLeastOnce,
1263            ))));
1264        eventloop
1265            .queued
1266            .push_back(RequestEnvelope::plain(Request::Publish(publish(
1267                QoS::AtMostOnce,
1268            ))));
1269
1270        assert!(eventloop.next_scheduled_request().is_none());
1271    }
1272
1273    #[test]
1274    fn scheduler_allows_each_progress_and_control_class_after_blocked_publish() {
1275        assert!(matches!(
1276            next_after_blocked_publish(Request::PubAck(PubAck::new(7))),
1277            Some(Request::PubAck(_))
1278        ));
1279        assert!(matches!(
1280            next_after_blocked_publish(Request::PubRec(PubRec::new(8))),
1281            Some(Request::PubRec(_))
1282        ));
1283        assert!(matches!(
1284            next_after_blocked_publish(Request::PubRel(PubRel::new(9))),
1285            Some(Request::PubRel(_))
1286        ));
1287        assert!(matches!(
1288            next_after_blocked_publish(Request::PubComp(PubComp::new(10))),
1289            Some(Request::PubComp(_))
1290        ));
1291        assert!(matches!(
1292            next_after_blocked_publish(Request::PingReq(PingReq)),
1293            Some(Request::PingReq(_))
1294        ));
1295        assert!(matches!(
1296            next_after_blocked_publish(Request::Subscribe(Subscribe::new(
1297                "hello/world",
1298                QoS::AtMostOnce
1299            ))),
1300            Some(Request::Subscribe(_))
1301        ));
1302        assert!(matches!(
1303            next_after_blocked_publish(Request::Unsubscribe(Unsubscribe::new("hello/world"))),
1304            Some(Request::Unsubscribe(_))
1305        ));
1306        assert!(matches!(
1307            next_after_blocked_publish(Request::Disconnect(Disconnect)),
1308            Some(Request::Disconnect(_))
1309        ));
1310    }
1311
1312    #[test]
1313    fn scheduler_unsubscribe_after_blocked_publish_uses_non_conflicting_packet_id() {
1314        let mut options = MqttOptions::new("test-client", "localhost");
1315        options.set_inflight(1);
1316        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1317        fill_publish_window(&mut eventloop);
1318        eventloop
1319            .queued
1320            .push_back(RequestEnvelope::plain(Request::Publish(publish(
1321                QoS::AtLeastOnce,
1322            ))));
1323        eventloop
1324            .queued
1325            .push_back(RequestEnvelope::plain(Request::Unsubscribe(
1326                Unsubscribe::new("hello/world"),
1327            )));
1328
1329        let (request, _) = eventloop.next_scheduled_request().unwrap();
1330
1331        assert!(matches!(request, Request::Unsubscribe(_)));
1332        match eventloop.state.handle_outgoing_packet(request).unwrap() {
1333            Some(Packet::Unsubscribe(unsubscribe)) => assert_ne!(unsubscribe.pkid, 1),
1334            packet => panic!("expected unsubscribe packet, got {packet:?}"),
1335        }
1336    }
1337
1338    #[test]
1339    fn scheduler_preserves_sendable_publish_before_later_control() {
1340        let (mut eventloop, _request_tx) =
1341            EventLoop::new_for_async_client(MqttOptions::new("test-client", "localhost"), 1);
1342        eventloop
1343            .queued
1344            .push_back(RequestEnvelope::plain(Request::Publish(publish(
1345                QoS::AtLeastOnce,
1346            ))));
1347        eventloop
1348            .queued
1349            .push_back(RequestEnvelope::plain(Request::Subscribe(Subscribe::new(
1350                "hello/world",
1351                QoS::AtMostOnce,
1352            ))));
1353
1354        let (request, _) = eventloop.next_scheduled_request().unwrap();
1355
1356        assert!(matches!(request, Request::Publish(_)));
1357    }
1358
1359    #[tokio::test]
1360    async fn select_admits_control_request_after_ready_publish_backlog_snapshot() {
1361        let mut options = MqttOptions::new("test-client", "localhost");
1362        options.set_max_request_batch(1);
1363        let (mut eventloop, request_tx, control_request_tx, _immediate_disconnect_tx) =
1364            EventLoop::new_for_async_client_with_capacity(
1365                options,
1366                RequestChannelCapacity::Unbounded,
1367            );
1368        let (client, _peer) = tokio::io::duplex(1024);
1369        eventloop.network = Some(Network::new(client, 1024, 1024));
1370
1371        request_tx
1372            .send_async(RequestEnvelope::plain(Request::Publish(publish(
1373                QoS::AtMostOnce,
1374            ))))
1375            .await
1376            .unwrap();
1377        request_tx
1378            .send_async(RequestEnvelope::plain(Request::Publish(publish(
1379                QoS::AtMostOnce,
1380            ))))
1381            .await
1382            .unwrap();
1383        control_request_tx
1384            .send_async(RequestEnvelope::plain(Request::Disconnect(Disconnect)))
1385            .await
1386            .unwrap();
1387
1388        let first = time::timeout(Duration::from_secs(1), eventloop.select())
1389            .await
1390            .expect("timed out waiting for first request")
1391            .expect("select should not fail");
1392        request_tx
1393            .send_async(RequestEnvelope::plain(Request::Publish(publish(
1394                QoS::AtMostOnce,
1395            ))))
1396            .await
1397            .unwrap();
1398        let second = time::timeout(Duration::from_secs(1), eventloop.select())
1399            .await
1400            .expect("timed out waiting for second request")
1401            .expect("select should not fail");
1402        let third = time::timeout(Duration::from_secs(1), eventloop.select())
1403            .await
1404            .expect("timed out waiting for control request")
1405            .expect("select should not fail");
1406
1407        assert!(matches!(first, Event::Outgoing(Outgoing::Publish(_))));
1408        assert!(matches!(second, Event::Outgoing(Outgoing::Publish(_))));
1409        assert!(matches!(third, Event::Outgoing(Outgoing::Disconnect)));
1410    }
1411
1412    #[test]
1413    fn eventloop_new_keeps_internal_sender_alive() {
1414        let options = MqttOptions::new("test-client", "localhost");
1415        let eventloop = EventLoop::new(options, 1);
1416
1417        assert!(matches!(
1418            eventloop.requests_rx.try_recv(),
1419            Err(TryRecvError::Empty)
1420        ));
1421    }
1422
1423    #[test]
1424    fn async_client_constructor_path_allows_channel_shutdown() {
1425        let options = MqttOptions::new("test-client", "localhost");
1426        let (eventloop, request_tx) = EventLoop::new_for_async_client(options, 1);
1427        drop(request_tx);
1428
1429        assert!(matches!(
1430            eventloop.requests_rx.try_recv(),
1431            Err(TryRecvError::Disconnected)
1432        ));
1433    }
1434
1435    #[test]
1436    fn clean_drops_ack_requests_drained_from_channel() {
1437        let options = MqttOptions::new("test-client", "localhost");
1438        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 3);
1439        request_tx
1440            .send(RequestEnvelope::plain(Request::PubAck(PubAck::new(7))))
1441            .unwrap();
1442        request_tx
1443            .send(RequestEnvelope::plain(Request::PubRec(PubRec::new(8))))
1444            .unwrap();
1445        request_tx
1446            .send(RequestEnvelope::plain(Request::PingReq(PingReq)))
1447            .unwrap();
1448
1449        eventloop.clean();
1450
1451        assert_eq!(eventloop.pending_len(), 1);
1452        assert!(matches!(
1453            pending_front_request(&eventloop),
1454            Some(Request::PingReq(_))
1455        ));
1456    }
1457
1458    #[test]
1459    fn clean_drops_ack_requests_drained_from_queued_scheduler() {
1460        let options = MqttOptions::new("test-client", "localhost");
1461        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 3);
1462        eventloop
1463            .queued
1464            .push_back(RequestEnvelope::plain(Request::PubAck(PubAck::new(7))));
1465        eventloop
1466            .queued
1467            .push_back(RequestEnvelope::plain(Request::PubRec(PubRec::new(8))));
1468        eventloop
1469            .queued
1470            .push_back(RequestEnvelope::plain(Request::PingReq(PingReq)));
1471
1472        eventloop.clean();
1473
1474        assert_eq!(eventloop.pending_len(), 1);
1475        assert!(matches!(
1476            pending_front_request(&eventloop),
1477            Some(Request::PingReq(_))
1478        ));
1479    }
1480
1481    #[tokio::test]
1482    #[cfg(unix)]
1483    async fn network_connect_rejects_unix_broker_with_tcp_transport() {
1484        let mut options = MqttOptions::new("test-client", crate::Broker::unix("/tmp/mqtt.sock"));
1485        options.set_transport(Transport::tcp());
1486
1487        match network_connect(&options, NetworkOptions::new()).await {
1488            Err(ConnectionError::BrokerTransportMismatch) => {}
1489            Err(err) => panic!("unexpected error: {err:?}"),
1490            Ok(_) => panic!("mismatched broker and transport should fail"),
1491        }
1492    }
1493
1494    #[tokio::test]
1495    #[cfg(feature = "websocket")]
1496    async fn network_connect_rejects_tcp_broker_with_websocket_transport() {
1497        let mut options = MqttOptions::new("test-client", "localhost");
1498        options.set_transport(Transport::Ws);
1499
1500        match network_connect(&options, NetworkOptions::new()).await {
1501            Err(ConnectionError::BrokerTransportMismatch) => {}
1502            Err(err) => panic!("unexpected error: {err:?}"),
1503            Ok(_) => panic!("mismatched broker and transport should fail"),
1504        }
1505    }
1506
1507    #[tokio::test]
1508    #[cfg(feature = "websocket")]
1509    async fn network_connect_rejects_websocket_broker_with_tcp_transport() {
1510        let broker = crate::Broker::websocket("ws://localhost:9001/mqtt").unwrap();
1511        let mut options = MqttOptions::new("test-client", broker);
1512        options.set_transport(Transport::tcp());
1513
1514        match network_connect(&options, NetworkOptions::new()).await {
1515            Err(ConnectionError::BrokerTransportMismatch) => {}
1516            Err(err) => panic!("unexpected error: {err:?}"),
1517            Ok(_) => panic!("mismatched broker and transport should fail"),
1518        }
1519    }
1520
1521    #[tokio::test]
1522    async fn async_client_path_reports_requests_done_after_pending_drain() {
1523        let options = MqttOptions::new("test-client", "localhost");
1524        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 1);
1525        push_pending(&mut eventloop, Request::PingReq(PingReq));
1526        drop(request_tx);
1527
1528        let request = EventLoop::next_request(
1529            &mut eventloop.pending,
1530            &eventloop.requests_rx,
1531            Duration::ZERO,
1532        )
1533        .await
1534        .unwrap();
1535        assert!(matches!(request, (Request::PingReq(_), None)));
1536
1537        let err = EventLoop::next_request(
1538            &mut eventloop.pending,
1539            &eventloop.requests_rx,
1540            Duration::ZERO,
1541        )
1542        .await
1543        .unwrap_err();
1544        assert!(matches!(err, ConnectionError::RequestsDone));
1545    }
1546
1547    #[tokio::test]
1548    async fn next_request_is_cancellation_safe_for_pending_queue() {
1549        let options = MqttOptions::new("test-client", "localhost");
1550        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1551        push_pending(&mut eventloop, Request::PingReq(PingReq));
1552
1553        let delayed = EventLoop::next_request(
1554            &mut eventloop.pending,
1555            &eventloop.requests_rx,
1556            Duration::from_millis(50),
1557        );
1558        let timed_out = time::timeout(Duration::from_millis(5), delayed).await;
1559
1560        assert!(timed_out.is_err());
1561        assert!(matches!(
1562            pending_front_request(&eventloop),
1563            Some(Request::PingReq(_))
1564        ));
1565    }
1566
1567    #[tokio::test]
1568    async fn try_next_request_applies_pending_throttle_for_followup_pending_item() {
1569        let options = MqttOptions::new("test-client", "localhost");
1570        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 2);
1571        push_pending(&mut eventloop, Request::PingReq(PingReq));
1572        push_pending(&mut eventloop, Request::Disconnect(Disconnect));
1573
1574        let first = EventLoop::next_request(
1575            &mut eventloop.pending,
1576            &eventloop.requests_rx,
1577            Duration::ZERO,
1578        )
1579        .await
1580        .unwrap();
1581        assert!(matches!(first, (Request::PingReq(_), None)));
1582
1583        let delayed = EventLoop::try_next_request(
1584            &mut eventloop.pending,
1585            &eventloop.requests_rx,
1586            Duration::from_millis(50),
1587        );
1588        let timed_out = time::timeout(Duration::from_millis(5), delayed).await;
1589
1590        assert!(timed_out.is_err());
1591        assert!(matches!(
1592            pending_front_request(&eventloop),
1593            Some(Request::Disconnect(_))
1594        ));
1595    }
1596
1597    #[tokio::test]
1598    async fn try_next_request_does_not_throttle_when_pending_queue_is_empty() {
1599        let options = MqttOptions::new("test-client", "localhost");
1600        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 1);
1601        request_tx
1602            .send_async(RequestEnvelope::plain(Request::PingReq(PingReq)))
1603            .await
1604            .unwrap();
1605
1606        let received = time::timeout(
1607            Duration::from_millis(20),
1608            EventLoop::try_next_request(
1609                &mut eventloop.pending,
1610                &eventloop.requests_rx,
1611                Duration::from_secs(1),
1612            ),
1613        )
1614        .await
1615        .unwrap();
1616
1617        assert!(matches!(received, Some((Request::PingReq(_), None))));
1618    }
1619
1620    #[tokio::test]
1621    async fn next_request_prioritizes_pending_over_channel_messages() {
1622        let options = MqttOptions::new("test-client", "localhost");
1623        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 2);
1624        push_pending(&mut eventloop, Request::PingReq(PingReq));
1625        request_tx
1626            .send_async(RequestEnvelope::plain(Request::PingReq(PingReq)))
1627            .await
1628            .unwrap();
1629
1630        let first = EventLoop::next_request(
1631            &mut eventloop.pending,
1632            &eventloop.requests_rx,
1633            Duration::ZERO,
1634        )
1635        .await
1636        .unwrap();
1637        assert!(matches!(first, (Request::PingReq(_), None)));
1638        assert!(eventloop.pending.is_empty());
1639
1640        let second = EventLoop::next_request(
1641            &mut eventloop.pending,
1642            &eventloop.requests_rx,
1643            Duration::ZERO,
1644        )
1645        .await
1646        .unwrap();
1647        assert!(matches!(second, (Request::PingReq(_), None)));
1648    }
1649
1650    #[tokio::test]
1651    async fn next_request_preserves_fifo_order_for_plain_and_tracked_requests() {
1652        let options = MqttOptions::new("test-client", "localhost");
1653        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 4);
1654        let (notice_tx, _notice) = PublishNoticeTx::new();
1655        let tracked_publish =
1656            Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1657
1658        request_tx
1659            .send_async(RequestEnvelope::plain(Request::PingReq(PingReq)))
1660            .await
1661            .unwrap();
1662        request_tx
1663            .send_async(RequestEnvelope::tracked_publish(
1664                tracked_publish.clone(),
1665                notice_tx,
1666            ))
1667            .await
1668            .unwrap();
1669        request_tx
1670            .send_async(RequestEnvelope::plain(Request::Disconnect(Disconnect)))
1671            .await
1672            .unwrap();
1673
1674        let first = EventLoop::next_request(
1675            &mut eventloop.pending,
1676            &eventloop.requests_rx,
1677            Duration::ZERO,
1678        )
1679        .await
1680        .unwrap();
1681        assert!(matches!(first, (Request::PingReq(_), None)));
1682
1683        let second = EventLoop::next_request(
1684            &mut eventloop.pending,
1685            &eventloop.requests_rx,
1686            Duration::ZERO,
1687        )
1688        .await
1689        .unwrap();
1690        assert!(matches!(
1691            second,
1692            (Request::Publish(publish), Some(_)) if publish == tracked_publish
1693        ));
1694
1695        let third = EventLoop::next_request(
1696            &mut eventloop.pending,
1697            &eventloop.requests_rx,
1698            Duration::ZERO,
1699        )
1700        .await
1701        .unwrap();
1702        assert!(matches!(third, (Request::Disconnect(_), None)));
1703    }
1704
1705    #[tokio::test]
1706    async fn tracked_qos0_notice_reports_not_flushed_on_first_write_failure() {
1707        let options = MqttOptions::new("test-client", "localhost");
1708        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 4);
1709        let (client, _peer) = tokio::io::duplex(1024);
1710        eventloop.network = Some(Network::new(client, 1024, 16));
1711
1712        let (notice_tx, notice) = PublishNoticeTx::new();
1713        let publish = Publish::new(
1714            "hello/world",
1715            crate::mqttbytes::QoS::AtMostOnce,
1716            vec![1; 128],
1717        );
1718        request_tx
1719            .send_async(RequestEnvelope::tracked_publish(publish, notice_tx))
1720            .await
1721            .unwrap();
1722
1723        let err = eventloop.select().await.unwrap_err();
1724        assert!(matches!(err, ConnectionError::MqttState(_)));
1725        assert_eq!(
1726            notice.wait_async().await.unwrap_err(),
1727            PublishNoticeError::Qos0NotFlushed
1728        );
1729    }
1730
1731    #[tokio::test]
1732    async fn tracked_qos0_notices_report_not_flushed_on_batched_write_failure() {
1733        let mut options = MqttOptions::new("test-client", "localhost");
1734        options.set_max_request_batch(2);
1735        let (mut eventloop, request_tx) = EventLoop::new_for_async_client(options, 4);
1736        let (client, _peer) = tokio::io::duplex(1024);
1737        eventloop.network = Some(Network::new(client, 1024, 80));
1738
1739        let small_publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtMostOnce, vec![1]);
1740        let large_publish = Publish::new(
1741            "hello/world",
1742            crate::mqttbytes::QoS::AtMostOnce,
1743            vec![2; 256],
1744        );
1745
1746        let (first_notice_tx, first_notice) = PublishNoticeTx::new();
1747        request_tx
1748            .send_async(RequestEnvelope::tracked_publish(
1749                small_publish,
1750                first_notice_tx,
1751            ))
1752            .await
1753            .unwrap();
1754
1755        let (second_notice_tx, second_notice) = PublishNoticeTx::new();
1756        request_tx
1757            .send_async(RequestEnvelope::tracked_publish(
1758                large_publish,
1759                second_notice_tx,
1760            ))
1761            .await
1762            .unwrap();
1763
1764        let err = eventloop.select().await.unwrap_err();
1765        assert!(matches!(err, ConnectionError::MqttState(_)));
1766        assert_eq!(
1767            first_notice.wait_async().await.unwrap_err(),
1768            PublishNoticeError::Qos0NotFlushed
1769        );
1770        assert_eq!(
1771            second_notice.wait_async().await.unwrap_err(),
1772            PublishNoticeError::Qos0NotFlushed
1773        );
1774    }
1775
1776    #[tokio::test]
1777    async fn drain_pending_as_failed_drains_all_and_returns_count() {
1778        let options = MqttOptions::new("test-client", "localhost");
1779        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1780        let (notice_tx, notice) = PublishNoticeTx::new();
1781        let publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1782        eventloop
1783            .pending
1784            .push_back(RequestEnvelope::tracked_publish(publish, notice_tx));
1785        eventloop
1786            .pending
1787            .push_back(RequestEnvelope::plain(Request::PingReq(PingReq)));
1788
1789        let drained = eventloop.drain_pending_as_failed(NoticeFailureReason::SessionReset);
1790
1791        assert_eq!(drained, 2);
1792        assert!(eventloop.pending.is_empty());
1793        assert_eq!(
1794            notice.wait_async().await.unwrap_err(),
1795            PublishNoticeError::SessionReset
1796        );
1797    }
1798
1799    #[tokio::test]
1800    async fn drain_pending_as_failed_reports_session_reset_for_tracked_notices() {
1801        let options = MqttOptions::new("test-client", "localhost");
1802        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1803        let (publish_notice_tx, publish_notice) = PublishNoticeTx::new();
1804        let publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1805        eventloop
1806            .pending
1807            .push_back(RequestEnvelope::tracked_publish(publish, publish_notice_tx));
1808
1809        let (request_notice_tx, request_notice) = SubscribeNoticeTx::new();
1810        let subscribe = Subscribe::new("hello/world", crate::mqttbytes::QoS::AtMostOnce);
1811        eventloop
1812            .pending
1813            .push_back(RequestEnvelope::tracked_subscribe(
1814                subscribe,
1815                request_notice_tx,
1816            ));
1817
1818        eventloop.drain_pending_as_failed(NoticeFailureReason::SessionReset);
1819
1820        assert_eq!(
1821            publish_notice.wait_async().await.unwrap_err(),
1822            PublishNoticeError::SessionReset
1823        );
1824        assert_eq!(
1825            request_notice.wait_async().await.unwrap_err(),
1826            crate::SubscribeNoticeError::SessionReset
1827        );
1828    }
1829
1830    #[tokio::test]
1831    async fn reset_session_state_reports_session_reset_for_pending_tracked_notice() {
1832        let options = MqttOptions::new("test-client", "localhost");
1833        let (mut eventloop, _request_tx) = EventLoop::new_for_async_client(options, 1);
1834        let (notice_tx, notice) = PublishNoticeTx::new();
1835        let publish = Publish::new("hello/world", crate::mqttbytes::QoS::AtLeastOnce, "payload");
1836        eventloop
1837            .pending
1838            .push_back(RequestEnvelope::tracked_publish(publish, notice_tx));
1839
1840        eventloop.reset_session_state();
1841
1842        assert!(eventloop.pending.is_empty());
1843        assert_eq!(
1844            notice.wait_async().await.unwrap_err(),
1845            PublishNoticeError::SessionReset
1846        );
1847    }
1848
1849    #[test]
1850    fn connack_reconcile_rejects_clean_session_with_session_present() {
1851        let mut eventloop = build_eventloop_with_pending(true);
1852
1853        let err = eventloop.reconcile_connack_session(true).unwrap_err();
1854
1855        assert!(matches!(
1856            err,
1857            ConnectionError::SessionStateMismatch {
1858                clean_session: true,
1859                session_present: true
1860            }
1861        ));
1862        assert_eq!(eventloop.pending_len(), 1);
1863    }
1864
1865    #[test]
1866    fn connack_reconcile_resets_pending_when_clean_session_gets_new_session() {
1867        let mut eventloop = build_eventloop_with_pending(true);
1868
1869        eventloop.reconcile_connack_session(false).unwrap();
1870
1871        assert!(eventloop.pending_is_empty());
1872    }
1873
1874    #[test]
1875    fn connack_reconcile_resets_pending_when_resumed_session_is_missing() {
1876        let mut eventloop = build_eventloop_with_pending(false);
1877
1878        eventloop.reconcile_connack_session(false).unwrap();
1879
1880        assert!(eventloop.pending_is_empty());
1881    }
1882
1883    #[test]
1884    fn connack_reconcile_keeps_pending_when_resumed_session_exists() {
1885        let mut eventloop = build_eventloop_with_pending(false);
1886
1887        eventloop.reconcile_connack_session(true).unwrap();
1888
1889        assert_eq!(eventloop.pending_len(), 1);
1890    }
1891}