Skip to main content

rust_mqtt/client/
mod.rs

1//! Implements full client functionality with session and configuration handling and Quality of Service flows.
2
3use core::num::NonZero;
4
5use heapless::Vec;
6
7use crate::{
8    buffer::BufferProvider,
9    bytes::Bytes,
10    client::{
11        event::{Event, Puback, Publish, Pubrej, Suback},
12        info::ConnectInfo,
13        options::{
14            ConnectOptions, DisconnectOptions, PublicationOptions, SubscriptionOptions,
15            TopicReference,
16        },
17        raw::Raw,
18    },
19    config::{ClientConfig, MaximumPacketSize, ServerConfig, SessionExpiryInterval, SharedConfig},
20    fmt::{assert, debug, error, info, panic, trace, unreachable, warn},
21    header::{FixedHeader, PacketType},
22    io::Transport,
23    packet::{Packet, TxPacket},
24    session::{CPublishFlightState, SPublishFlightState, Session},
25    types::{
26        IdentifiedQoS, MqttBinary, MqttString, PacketIdentifier, QoS, ReasonCode,
27        SubscriptionFilter, TopicFilter, TopicName, VarByteInt,
28    },
29    v5::{
30        packet::{
31            ConnackPacket, ConnectPacket, DisconnectPacket, PingreqPacket, PingrespPacket,
32            PubackPacket, PubcompPacket, PublishPacket, PubrecPacket, PubrelPacket, SubackPacket,
33            SubscribePacket, UnsubackPacket, UnsubscribePacket,
34        },
35        property::Property,
36    },
37};
38
39mod err;
40
41pub mod event;
42pub mod info;
43pub mod options;
44pub mod raw;
45
46pub use err::Error as MqttError;
47
48/// An MQTT client.
49///
50/// Configuration via const parameters:
51/// - `MAX_SUBSCRIBES`: The maximum amount of in-flight/unacknowledged SUBSCRIBE packets (one per call to [`Self::subscribe`]).
52/// - `RECEIVE_MAXIMUM`: MQTT's control flow mechanism. The maximum amount of incoming [`QoS::AtLeastOnce`] and
53///   [`QoS::ExactlyOnce`] publications (accumulated). Must not be 0 and must not be greater than 65535.
54/// - `SEND_MAXIMUM`: The maximum amount of outgoing [`QoS::AtLeastOnce`] and [`QoS::ExactlyOnce`] publications. The server
55///   can further limit this with its receive maximum. The client will use the minimum of this value and [`Self::server_config`].
56/// - `MAX_SUBSCRIPTION_IDENTIFIERS`: The maximum amount of subscription identifier properties the client can receive within a
57///   single PUBLISH packet. If a packet with more subscription identifiers is received, the later identifers will be discarded.
58#[derive(Debug)]
59#[cfg_attr(feature = "defmt", derive(defmt::Format))]
60pub struct Client<
61    'c,
62    N: Transport,
63    B: BufferProvider<'c>,
64    const MAX_SUBSCRIBES: usize,
65    const RECEIVE_MAXIMUM: usize,
66    const SEND_MAXIMUM: usize,
67    const MAX_SUBSCRIPTION_IDENTIFIERS: usize,
68> {
69    client_config: ClientConfig,
70    shared_config: SharedConfig,
71    server_config: ServerConfig,
72    session: Session<RECEIVE_MAXIMUM, SEND_MAXIMUM>,
73
74    raw: Raw<'c, N, B>,
75
76    packet_identifier_counter: PacketIdentifier,
77
78    /// sent SUBSCRIBE packets
79    pending_suback: Vec<PacketIdentifier, MAX_SUBSCRIBES>,
80    /// sent UNSUBSCRIBE packets
81    pending_unsuback: Vec<PacketIdentifier, MAX_SUBSCRIBES>,
82}
83
84impl<
85    'c,
86    N: Transport,
87    B: BufferProvider<'c>,
88    const MAX_SUBSCRIBES: usize,
89    const RECEIVE_MAXIMUM: usize,
90    const SEND_MAXIMUM: usize,
91    const MAX_SUBSCRIPTION_IDENTIFIERS: usize,
92> Client<'c, N, B, MAX_SUBSCRIBES, RECEIVE_MAXIMUM, SEND_MAXIMUM, MAX_SUBSCRIPTION_IDENTIFIERS>
93{
94    /// Creates a new, disconnected MQTT client using a buffer provider to store
95    /// dynamically sized fields of received packets.
96    /// The session state is initialised as a new session. If you want to start the
97    /// client with an existing session, use [`Self::with_session`].
98    pub fn new(buffer: &'c mut B) -> Self {
99        assert!(
100            RECEIVE_MAXIMUM <= 65535,
101            "RECEIVE_MAXIMUM must be less than or equal to 65535"
102        );
103        assert!(
104            RECEIVE_MAXIMUM > 0,
105            "RECEIVE_MAXIMUM must be greater than 0"
106        );
107
108        Self {
109            client_config: ClientConfig::default(),
110            shared_config: SharedConfig::default(),
111            server_config: ServerConfig::default(),
112            session: Session::default(),
113
114            raw: Raw::new_disconnected(buffer),
115
116            packet_identifier_counter: PacketIdentifier::ONE,
117
118            pending_suback: Vec::new(),
119            pending_unsuback: Vec::new(),
120        }
121    }
122
123    /// Creates a new, disconnected MQTT client using a buffer provider to store
124    /// dynamically sized fields of received packets.
125    pub fn with_session(
126        session: Session<RECEIVE_MAXIMUM, SEND_MAXIMUM>,
127        buffer: &'c mut B,
128    ) -> Self {
129        let mut s = Self::new(buffer);
130        s.session = session;
131        s
132    }
133
134    /// Returns the amount of publications the client is allowed to make according to the server's
135    /// receive maximum. Does not account local space for storing publication state.
136    fn remaining_send_quota(&self) -> u16 {
137        self.server_config.receive_maximum.into_inner().get() - self.session.in_flight_cpublishes()
138    }
139
140    fn is_packet_identifier_used(&self, packet_identifier: PacketIdentifier) -> bool {
141        self.session
142            .is_used_cpublish_packet_identifier(packet_identifier)
143            || self.pending_suback.contains(&packet_identifier)
144            || self.pending_unsuback.contains(&packet_identifier)
145    }
146
147    /// Returns configuration for this client.
148    #[inline]
149    pub fn client_config(&self) -> &ClientConfig {
150        &self.client_config
151    }
152
153    /// Returns the configuration of the currently or last connected server if there is one.
154    #[inline]
155    pub fn server_config(&self) -> &ServerConfig {
156        &self.server_config
157    }
158
159    /// Returns the configuration negotiated between the client and server.
160    #[inline]
161    pub fn shared_config(&self) -> &SharedConfig {
162        &self.shared_config
163    }
164
165    /// Returns session related configuration and tracking information.
166    #[inline]
167    pub fn session(&self) -> &Session<RECEIVE_MAXIMUM, SEND_MAXIMUM> {
168        &self.session
169    }
170
171    /// Returns an immutable reference to the supplied [`BufferProvider`] implementation.
172    #[inline]
173    pub fn buffer(&self) -> &B {
174        self.raw.buffer()
175    }
176
177    /// Returns a mutable reference to the supplied [`BufferProvider`] implementation.
178    ///
179    /// This can for example be used to reset the underlying buffer if using `BumpBuffer`.
180    #[inline]
181    pub fn buffer_mut(&mut self) -> &mut B {
182        self.raw.buffer_mut()
183    }
184
185    /// Generates a new packet identifier.
186    fn packet_identifier(&mut self) -> PacketIdentifier {
187        loop {
188            let packet_identifier = self.packet_identifier_counter;
189
190            self.packet_identifier_counter = packet_identifier.next();
191
192            if !self.is_packet_identifier_used(packet_identifier) {
193                break packet_identifier;
194            }
195        }
196    }
197
198    /// Returns true if the packet identifier exists.
199    fn remove_packet_identifier_if_exists<const M: usize>(
200        vec: &mut Vec<PacketIdentifier, M>,
201        pid: PacketIdentifier,
202    ) -> bool {
203        if let Some(i) = vec.iter().position(|p| *p == pid) {
204            // Safety: The index has just been found in the vector
205            unsafe { vec.swap_remove_unchecked(i) };
206            true
207        } else {
208            false
209        }
210    }
211
212    /// Connect the client to an MQTT server on the other end of the `net` argument.
213    /// Sends a CONNECT message and awaits the CONNACK response by the server.
214    ///
215    /// Only call this when
216    /// - the client is newly constructed.
217    /// - a non-recoverable error has occured and [`Self::abort`] has been called.
218    /// - [`Self::disconnect`] has been called.
219    ///
220    /// The session expiry interval in [`ConnectOptions`] overrides the one in the session of the client.
221    ///
222    /// Configuration that was negotiated with the server is stored in the `client_config`,
223    /// `server_config`, `shared_config`, and `session` fields, which have getters
224    /// ([`Self::client_config`], [`Self::server_config`], [`Self::shared_config`],
225    /// [`Self::session`]).
226    ///
227    /// If the server does not have a session present, the client's session is cleared. In case you would want
228    /// to keep the session state, you can call [`Self::session`] and clone the session before.
229    ///
230    /// # Returns:
231    /// Information about the session/connection that the client does currently not use and therefore  not store
232    /// in its configuration fields.
233    ///
234    /// # Errors
235    ///
236    /// * [`MqttError::Server`] if:
237    ///   * the server sends a malformed packet
238    ///   * the first received packet is something other than a CONNACK packet
239    ///   * `client_identifier` is [`None`] and the server did not assign a client identifier
240    ///   * the server causes a protocol error
241    /// * [`MqttError::Disconnect`] if the CONNACK packet's reason code is not successful (>= 0x80)
242    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
243    /// * [`MqttError::Alloc`] if the underlying [`BufferProvider`] returned an error
244    pub async fn connect<'d>(
245        &mut self,
246        net: N,
247        options: &ConnectOptions<'_>,
248        client_identifier: Option<MqttString<'d>>,
249    ) -> Result<ConnectInfo<'d>, MqttError<'c>>
250    where
251        'c: 'd,
252    {
253        if options.clean_start {
254            self.session.clear();
255        }
256
257        self.pending_suback.clear();
258        self.pending_unsuback.clear();
259
260        self.raw.set_net(net);
261
262        // Set client session expiry interval because it is relevant to determine
263        // which session expiry interval can be sent in DISCONNECT packet.
264        self.client_config.session_expiry_interval = options.session_expiry_interval;
265
266        // Empirical maximum packet size mapping
267        // -------------------------------------------------------------------------------------------------------
268        //         remaining length              | fixed header length |              max packet size
269        //                               0..=127 |                   2 |                                   2..=129
270        //                          128..=16_383 |                   3 |                              131..=16_386
271        //                    16_384..=2_097_151 |                   4 |                        16_388..=2_097_155
272        // 2_097_152..=VarByteInt::MAX_ENCODABLE |                   5 | 2_097_157..=(VarByteInt::MAX_ENCODABLE+5)
273
274        const MAX_POSSIBLE_PACKET_SIZE: u32 = VarByteInt::MAX_ENCODABLE + 5;
275
276        self.client_config.maximum_accepted_remaining_length = match options.maximum_packet_size {
277            MaximumPacketSize::Unlimited => u32::MAX,
278            MaximumPacketSize::Limit(l) => match l.get() {
279                0 => unreachable!("NonZero invariant"),
280                1 => panic!(
281                    "Every MQTT packet is at least 2 bytes long, a smaller maximum packet size makes no sense"
282                ),
283                2..=129 => l.get() - 2,
284                130..=16_386 => l.get() - 3,
285                16_387..=2_097_155 => l.get() - 4,
286                2_097_156..MAX_POSSIBLE_PACKET_SIZE => l.get() - 5,
287                MAX_POSSIBLE_PACKET_SIZE.. => VarByteInt::MAX_ENCODABLE,
288            },
289        };
290
291        trace!(
292            "maximum accepted remaining length set to {:?}",
293            self.client_config.maximum_accepted_remaining_length
294        );
295
296        {
297            let packet_client_identifier = client_identifier
298                .as_ref()
299                .map(MqttString::as_borrowed)
300                .unwrap_or_default();
301
302            let mut packet = ConnectPacket::new(
303                packet_client_identifier,
304                options.clean_start,
305                options.keep_alive,
306                options.maximum_packet_size,
307                options.session_expiry_interval,
308                // Safety: `Self::new` panics if `RECEIVE_MAXIMUM` is 0. Thus, this
309                // code is only reached when `RECEIVE_MAXIMUM` is greater than 0.
310                unsafe { NonZero::new_unchecked(RECEIVE_MAXIMUM as u16) },
311                options.request_response_information,
312            );
313
314            if let Some(ref user_name) = options.user_name {
315                packet.add_user_name(user_name.as_borrowed());
316            }
317            if let Some(ref password) = options.password {
318                packet.add_password(password.as_borrowed());
319            }
320
321            if let Some(ref will) = options.will {
322                let will_qos = will.will_qos;
323                let will_retain = will.will_retain;
324
325                packet.add_will(will.as_borrowed_will(), will_qos, will_retain);
326            }
327
328            debug!("sending CONNECT packet");
329            self.raw.send(&packet).await?;
330            self.raw.flush().await?;
331        }
332
333        let header = self.raw.recv_header().await?;
334
335        match header.packet_type() {
336            Ok(ConnackPacket::PACKET_TYPE) => debug!(
337                "received CONNACK packet header (remaining length: {})",
338                header.remaining_len.value()
339            ),
340            Ok(t) => {
341                error!("received unexpected {:?} packet header", t);
342
343                self.raw.close_with(Some(ReasonCode::ProtocolError));
344                return Err(MqttError::Server);
345            }
346            Err(_) => {
347                error!("received invalid header {:?}", header);
348                self.raw.close_with(Some(ReasonCode::MalformedPacket));
349                return Err(MqttError::Server);
350            }
351        }
352
353        let ConnackPacket {
354            session_present,
355            reason_code,
356            session_expiry_interval,
357            receive_maximum,
358            maximum_qos,
359            retain_available,
360            maximum_packet_size,
361            assigned_client_identifier,
362            topic_alias_maximum,
363            reason_string,
364            wildcard_subscription_available,
365            subscription_identifier_available,
366            shared_subscription_available,
367            server_keep_alive,
368            response_information,
369            server_reference,
370        } = self.raw.recv_body(&header).await?;
371
372        if reason_code.is_success() {
373            debug!("CONNACK packet indicates success");
374
375            if !session_present && !options.clean_start {
376                info!("server does not have the requested session present.");
377                self.session.clear();
378            }
379
380            let client_identifier = assigned_client_identifier
381                .map(Property::into_inner)
382                .or(client_identifier)
383                .ok_or_else(|| {
384                    error!("server did not assign a client identifier when it was required.");
385                    self.raw.close_with(Some(ReasonCode::ProtocolError));
386                    MqttError::Server
387                })?;
388
389            self.shared_config.session_expiry_interval =
390                session_expiry_interval.unwrap_or(options.session_expiry_interval);
391            self.shared_config.keep_alive =
392                server_keep_alive.map_or(options.keep_alive, Property::into_inner);
393
394            if let Some(r) = receive_maximum {
395                self.server_config.receive_maximum = r;
396            }
397            if let Some(m) = maximum_qos {
398                self.server_config.maximum_qos = m.into_inner();
399            }
400            if let Some(r) = retain_available {
401                self.server_config.retain_supported = r.into_inner();
402            }
403            if let Some(m) = maximum_packet_size {
404                self.server_config.maximum_packet_size = m;
405            }
406            if let Some(t) = topic_alias_maximum {
407                self.server_config.topic_alias_maximum = t.into_inner();
408            }
409            if let Some(w) = wildcard_subscription_available {
410                self.server_config.wildcard_subscription_supported = w.into_inner();
411            }
412            if let Some(s) = subscription_identifier_available {
413                self.server_config.subscription_identifiers_supported = s.into_inner();
414            }
415            if let Some(s) = shared_subscription_available {
416                self.server_config.shared_subscription_supported = s.into_inner();
417            }
418
419            info!("connected to server (session present: {})", session_present);
420
421            Ok(ConnectInfo {
422                session_present,
423                client_identifier,
424                response_information: response_information.map(Property::into_inner),
425                server_reference: server_reference.map(Property::into_inner),
426            })
427        } else {
428            debug!("CONNACK packet indicates rejection");
429            info!("connection rejected by server (reason: {:?})", reason_code);
430
431            self.raw.close_with(None);
432
433            info!("disconnected from server");
434
435            Err(MqttError::Disconnect {
436                reason: reason_code,
437                reason_string: reason_string.map(Property::into_inner),
438                server_reference: server_reference.map(Property::into_inner),
439            })
440        }
441    }
442
443    /// Start a ping handshake by sending a PINGRESP packet.
444    ///
445    /// # Errors
446    ///
447    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
448    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
449    pub async fn ping(&mut self) -> Result<(), MqttError<'c>> {
450        debug!("sending PINGREQ packet");
451
452        // PINGREQ has length 2 which really shouldn't exceed server's max packet size.
453        // If it does the server should reconsider its incarnation as an MQTT server.
454        self.raw.send(&PingreqPacket::new()).await?;
455        self.raw.flush().await?;
456
457        Ok(())
458    }
459
460    /// Subscribes to a single topic with the given options.
461    ///
462    /// The client keeps track of the packet identifier sent in the SUBSCRIBE packet.
463    /// If no [`Event::Suback`] is received within a custom time,
464    /// this method can be used to send the SUBSCRIBE packet again.
465    ///
466    /// A subscription identifier should only be set if the server supports
467    /// subscription identifiers (Can be checked with [`Self::server_config`]).
468    /// The client does not double-check whether this feature is supported and will
469    /// always include the subscription identifier argument if present.
470    ///
471    /// # Returns:
472    /// The packet identifier of the sent SUBSCRIBE packet.
473    ///
474    /// # Errors
475    ///
476    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
477    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
478    /// * [`MqttError::SessionBuffer`] if the buffer for outgoing SUBSCRIBE packet identifiers is full
479    /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
480    ///   exceeded by sending this SUBSCRIBE packet
481    pub async fn subscribe(
482        &mut self,
483        topic_filter: TopicFilter<'_>,
484        options: SubscriptionOptions,
485    ) -> Result<PacketIdentifier, MqttError<'c>> {
486        if self.pending_suback.len() == MAX_SUBSCRIBES {
487            info!("maximum concurrent subscriptions reached");
488            return Err(MqttError::SessionBuffer);
489        }
490
491        let subscribe_filter = SubscriptionFilter::new(topic_filter, &options);
492
493        let pid = self.packet_identifier();
494        let mut subscribe_filters = Vec::<_, 1>::new();
495        let _ = subscribe_filters.push(subscribe_filter);
496        let packet = SubscribePacket::new(pid, options.subscription_identifier, subscribe_filters)
497            .expect("SUBSCRIBE with a single topic can not exceed VarByteInt::MAX_ENCODABLE");
498
499        if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
500            return Err(MqttError::ServerMaximumPacketSizeExceeded);
501        }
502
503        debug!("sending SUBSCRIBE packet");
504
505        self.raw.send(&packet).await?;
506        self.raw.flush().await?;
507        self.pending_suback.push(pid).unwrap();
508
509        Ok(pid)
510    }
511
512    /// Unsubscribes from a single topic filter.
513    ///
514    /// The client keeps track of the packet identifier sent in the UNSUBSCRIBE packet.
515    /// If no [`Event::Unsuback`] is received within a custom time,
516    /// this method can be used to send the UNSUBSCRIBE packet again.
517    ///
518    /// # Returns:
519    /// The packet identifier of the sent UNSUBSCRIBE packet.
520    ///
521    /// # Errors
522    ///
523    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
524    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
525    /// * [`MqttError::SessionBuffer`] if the buffer for outgoing UNSUBSCRIBE packet identifiers is full
526    /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
527    ///   exceeded by sending this UNSUBSCRIBE packet
528    pub async fn unsubscribe(
529        &mut self,
530        topic_filter: TopicFilter<'_>,
531    ) -> Result<PacketIdentifier, MqttError<'c>> {
532        if self.pending_unsuback.len() == MAX_SUBSCRIBES {
533            info!("maximum concurrent unsubscriptions reached");
534            return Err(MqttError::SessionBuffer);
535        }
536
537        let pid = self.packet_identifier();
538        let mut topic_filters = Vec::<_, 1>::new();
539        let _ = topic_filters.push(topic_filter);
540        let packet = UnsubscribePacket::new(pid, topic_filters)
541            .expect("UNSUBSCRIBE with a single topic cannot exceed VarByteInt::MAX_ENCODABLE");
542
543        if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
544            return Err(MqttError::ServerMaximumPacketSizeExceeded);
545        }
546
547        debug!("sending UNSUBSCRIBE packet");
548
549        self.raw.send(&packet).await?;
550        self.raw.flush().await?;
551        self.pending_unsuback.push(pid).unwrap();
552
553        Ok(pid)
554    }
555
556    /// Publish a message. If [`QoS`] is greater than 0, the packet identifier is also kept track of by the client
557    ///
558    /// # Returns:
559    /// - In case of [`QoS`] 0: [`None`]
560    /// - In case of [`QoS`] 1 or 2: [`Some`] with the packet identifier of the published packet
561    ///
562    /// # Errors
563    ///
564    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
565    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
566    /// * [`MqttError::SendQuotaExceeded`] if the server's control flow limit is reached and sending
567    ///   the PUBLISH would exceed the limit causing a protocol error
568    /// * [`MqttError::SessionBuffer`] if the buffer for outgoing PUBLISH packet identifiers is full
569    /// * [`MqttError::InvalidTopicAlias`] if a topic alias is used and
570    ///   * its value is 0
571    ///   * its value is greater than the server's maximum topic alias
572    /// * [`MqttError::PacketMaximumLengthExceeded`] if the PUBLISH packet is too long to be encoded
573    ///   with MQTT's [`VarByteInt`]
574    /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
575    ///   exceeded by sending this PUBLISH packet
576    pub async fn publish(
577        &mut self,
578        options: &PublicationOptions<'_>,
579        message: Bytes<'_>,
580    ) -> Result<Option<PacketIdentifier>, MqttError<'c>> {
581        if options.qos > QoS::AtMostOnce {
582            if self.remaining_send_quota() == 0 {
583                info!("server receive maximum reached");
584                return Err(MqttError::SendQuotaExceeded);
585            }
586            if self.session.cpublish_remaining_capacity() == 0 {
587                info!("client maximum concurrent publications reached");
588                return Err(MqttError::SessionBuffer);
589            }
590        }
591
592        let identified_qos = match options.qos {
593            QoS::AtMostOnce => IdentifiedQoS::AtMostOnce,
594            QoS::AtLeastOnce => IdentifiedQoS::AtLeastOnce(self.packet_identifier()),
595            QoS::ExactlyOnce => IdentifiedQoS::ExactlyOnce(self.packet_identifier()),
596        };
597
598        if options
599            .topic
600            .alias()
601            .is_some_and(|a| !(1..=self.server_config.topic_alias_maximum).contains(&a))
602        {
603            return Err(MqttError::InvalidTopicAlias);
604        }
605
606        let packet: PublishPacket<'_, 0> = PublishPacket::new(
607            false,
608            identified_qos,
609            options.retain,
610            options.topic.as_borrowed(),
611            options.payload_format_indicator.map(Into::into),
612            options.message_expiry_interval.map(Into::into),
613            options.response_topic.as_ref().map(TopicName::as_borrowed),
614            options
615                .correlation_data
616                .as_ref()
617                .map(MqttBinary::as_borrowed),
618            options
619                .content_type
620                .as_ref()
621                .map(MqttString::as_borrowed)
622                .map(Into::into),
623            message,
624        )?;
625
626        if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
627            return Err(MqttError::ServerMaximumPacketSizeExceeded);
628        }
629
630        // Treat the packet as sent before successfully sending. In case of a network error,
631        // we have tracked the packet as in flight and can republish it.
632        let pid = match identified_qos {
633            IdentifiedQoS::AtMostOnce => None,
634            IdentifiedQoS::AtLeastOnce(packet_identifier) => Some({
635                // Safety: `cpublish_remaining_capacity()` > 0 confirms that there is space.
636                unsafe { self.session.await_puback(packet_identifier) };
637                packet_identifier
638            }),
639            IdentifiedQoS::ExactlyOnce(packet_identifier) => Some({
640                // Safety: `cpublish_remaining_capacity()` > 0 confirms that there is space.
641                unsafe { self.session.await_pubrec(packet_identifier) };
642                packet_identifier
643            }),
644        };
645
646        match identified_qos.packet_identifier() {
647            Some(pid) => debug!("sending PUBLISH packet with packet identifier {}", pid),
648            None => debug!("sending PUBLISH packet"),
649        }
650
651        self.raw.send(&packet).await?;
652        self.raw.flush().await?;
653
654        Ok(pid)
655    }
656
657    /// Resends a PUBLISH packet with DUP flag set.
658    ///
659    /// This method must be called and must only be called after a reconnection with clean start set to 0,
660    /// as resending packets at any other time is a protocol error.
661    /// (Compare [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), \[MQTT-4.4.0-1\]).
662    ///
663    /// For a packet to be resent:
664    /// - it must have a quality of service > 0
665    /// - its packet identifier must have an in flight entry with a quality of service matching the
666    ///   quality of service in the options parameter
667    /// - in case of quality of service 2, it must not already be awaiting a PUBCOMP packet
668    ///
669    /// # Errors
670    ///
671    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
672    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
673    /// * [`MqttError::RepublishQoSNotMatching`] if the [`QoS`] of this republish does not match the
674    ///   [`QoS`] that this packet identifier was originally published with    
675    /// * [`MqttError::PacketIdentifierAwaitingPubcomp`] if a PUBREC packet with this packet identifier
676    ///   has already been received and the server has therefore already received the PUBLISH
677    /// * [`MqttError::PacketIdentifierNotInFlight`] if this packet identifier is not tracked in the
678    ///   client's session
679    /// * [`MqttError::InvalidTopicAlias`] if a topic alias is used and
680    ///   * its value is 0
681    ///   * its value is greater than the server's maximum topic alias
682    /// * [`MqttError::PacketMaximumLengthExceeded`] if the PUBLISH packet is too long to be encoded
683    ///   with MQTT's [`VarByteInt`]
684    /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
685    ///   exceeded by sending this PUBLISH packet
686    ///
687    /// # Panics
688    ///
689    /// This function may panic if the [`QoS`] in the `options` is [`QoS::AtMostOnce`].
690    pub async fn republish(
691        &mut self,
692        packet_identifier: PacketIdentifier,
693        options: &PublicationOptions<'_>,
694        message: Bytes<'_>,
695    ) -> Result<(), MqttError<'c>> {
696        if options.qos == QoS::AtMostOnce {
697            panic!("QoS 0 packets cannot be republished");
698        }
699
700        let identified_qos = match self.session.cpublish_flight_state(packet_identifier) {
701            Some(CPublishFlightState::AwaitingPuback) if options.qos == QoS::AtLeastOnce => {
702                IdentifiedQoS::AtLeastOnce(packet_identifier)
703            }
704            Some(CPublishFlightState::AwaitingPubrec) if options.qos == QoS::ExactlyOnce => {
705                IdentifiedQoS::ExactlyOnce(packet_identifier)
706            }
707
708            Some(CPublishFlightState::AwaitingPuback) => {
709                warn!(
710                    "packet identifier {} was originally published with QoS 1",
711                    packet_identifier
712                );
713                return Err(MqttError::RepublishQoSNotMatching);
714            }
715            Some(CPublishFlightState::AwaitingPubrec) => {
716                warn!(
717                    "packet identifier {} was originally published with QoS 2",
718                    packet_identifier
719                );
720                return Err(MqttError::RepublishQoSNotMatching);
721            }
722            Some(CPublishFlightState::AwaitingPubcomp) => {
723                warn!(
724                    "packet identifier {} is already awaiting PUBCOMP",
725                    packet_identifier
726                );
727                return Err(MqttError::PacketIdentifierAwaitingPubcomp);
728            }
729            None => {
730                warn!("packet identifier {} not in flight", packet_identifier);
731                return Err(MqttError::PacketIdentifierNotInFlight);
732            }
733        };
734
735        if options
736            .topic
737            .alias()
738            .is_some_and(|a| !(1..=self.server_config.topic_alias_maximum).contains(&a))
739        {
740            return Err(MqttError::InvalidTopicAlias);
741        }
742
743        let packet: PublishPacket<'_, 0> = PublishPacket::new(
744            true,
745            identified_qos,
746            options.retain,
747            options.topic.as_borrowed(),
748            options.payload_format_indicator.map(Into::into),
749            options.message_expiry_interval.map(Into::into),
750            options.response_topic.as_ref().map(TopicName::as_borrowed),
751            options
752                .correlation_data
753                .as_ref()
754                .map(MqttBinary::as_borrowed),
755            options
756                .content_type
757                .as_ref()
758                .map(MqttString::as_borrowed)
759                .map(Into::into),
760            message,
761        )?;
762
763        if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
764            return Err(MqttError::ServerMaximumPacketSizeExceeded);
765        }
766
767        // We only republish a message if its quality of service and flight state is correct.
768        // In this case, we don't have to change its in flight tracking state as it already
769        // is in the desired state.
770
771        debug!(
772            "resending PUBLISH packet with packet identifier {}",
773            packet_identifier
774        );
775
776        self.raw.send(&packet).await?;
777        self.raw.flush().await?;
778
779        Ok(())
780    }
781
782    /// Resends all pending PUBREL packets.
783    ///
784    /// This method must be called and must only be called after a reconnection
785    /// with clean start set to 0, as resending packets at any other time is a protocol error.
786    /// (Compare [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), \[MQTT-4.4.0-1\]).
787    ///
788    /// This method assumes that the server's receive maximum after the reconnection is great enough
789    /// to handle as many publication flows as dragged between the two connections.
790    ///
791    /// # Errors
792    ///
793    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
794    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
795    pub async fn rerelease(&mut self) -> Result<(), MqttError<'c>> {
796        for packet_identifier in self
797            .session
798            .pending_client_publishes
799            .iter()
800            .filter(|s| s.state == CPublishFlightState::AwaitingPubcomp)
801            .map(|p| p.packet_identifier)
802        {
803            let pubrel = PubrelPacket::new(packet_identifier, ReasonCode::Success);
804
805            // Don't check whether length exceeds servers maximum packet size because we don't
806            // add properties to PUBREL packets -> length is always minimal at 6 bytes.
807            // The server really shouldn't reject this.
808            self.raw.send(&pubrel).await?;
809        }
810
811        self.raw.flush().await?;
812
813        Ok(())
814    }
815
816    /// Disconnects from the server after an error occured in a situation-aware way by either:
817    /// - dropping the connection
818    /// - sending a DISCONNECT with the deposited reason code and dropping the connection.
819    ///
820    /// After an MQTT communication fails, usually either the client or the server closes the connection.
821    ///
822    /// This is not cancel-safe but you can set a timeout if reconnecting later anyway or you don't reuse the client.
823    #[inline]
824    pub async fn abort(&mut self) {
825        match self.raw.abort().await {
826            Ok(()) => info!("connection aborted"),
827            Err(e) => warn!("connection abort failed: {:?}", e),
828        }
829    }
830
831    /// Disconnects gracefully from the server by sending a DISCONNECT packet.
832    ///
833    /// # Preconditions:
834    /// - The client did not return a non-recoverable Error before
835    ///
836    /// # Errors
837    ///
838    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
839    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
840    /// * [`MqttError::IllegalDisconnectSessionExpiryInterval`] if the session expiry interval in the
841    ///   CONNECT packet was zero and the session expiry interval in the [`DisconnectOptions`] is [`Some`]
842    ///   and not [`SessionExpiryInterval::EndOnDisconnect`].
843    pub async fn disconnect(&mut self, options: &DisconnectOptions) -> Result<(), MqttError<'c>> {
844        let connect_session_expiry_interval_was_zero =
845            self.client_config.session_expiry_interval == SessionExpiryInterval::EndOnDisconnect;
846        let disconnect_session_expiry_interval_is_non_zero = options
847            .session_expiry_interval
848            .is_some_and(|s| s != SessionExpiryInterval::EndOnDisconnect);
849
850        if connect_session_expiry_interval_was_zero
851            && disconnect_session_expiry_interval_is_non_zero
852        {
853            return Err(MqttError::IllegalDisconnectSessionExpiryInterval);
854        }
855
856        let reason_code = if options.publish_will {
857            ReasonCode::DisconnectWithWillMessage
858        } else {
859            ReasonCode::Success
860        };
861
862        let mut packet = DisconnectPacket::new(reason_code);
863        if let Some(s) = options.session_expiry_interval {
864            packet.add_session_expiry_interval(s);
865        }
866
867        debug!("sending DISCONNECT packet");
868
869        // Don't check whether length exceeds servers maximum packet size because we don't
870        // add a reason string to the DISCONNECT packet -> length is always in the 4..=9 range in bytes.
871        // The server really shouldn't reject this.
872        self.raw.send(&packet).await?;
873        self.raw.flush().await?;
874
875        // Terminates (closes) the connection by dropping it
876        self.raw.close_with(None);
877
878        info!("disconnected from server");
879
880        Ok(())
881    }
882
883    /// Combines [`Self::poll_header`] and [`Self::poll_body`].
884    ///
885    /// Polls the network for a full packet. Not cancel-safe.
886    ///
887    /// # Preconditions:
888    /// - The last MQTT packet was received completely
889    /// - The client did not return a non-recoverable Error before
890    ///
891    /// # Returns:
892    /// MQTT Events. Their further meaning is documented in [`Event`].
893    ///
894    /// # Errors
895    ///
896    /// Returns the errors that [`Client::poll_header`] and [`Client::poll_body`] return.
897    /// For further information view their docs.
898    pub async fn poll(&mut self) -> Result<Event<'c, MAX_SUBSCRIPTION_IDENTIFIERS>, MqttError<'c>> {
899        let header = self.poll_header().await?;
900        self.poll_body(header).await
901    }
902
903    /// Polls the network for a fixed header in a cancel-safe way.
904    ///
905    /// If a fixed header is received, the first 4 bits (packet type) are checked for correctness.
906    ///
907    /// # Preconditions:
908    /// - The last MQTT packet was received completely
909    /// - The client did not return a non-recoverable Error before
910    ///
911    /// # Returns:
912    /// The received fixed header with a valid packet type. It can be used to call [`Self::poll_body`].
913    ///
914    /// # Errors
915    ///
916    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
917    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
918    /// * [`MqttError::Server`] if:
919    ///   * the server sends a malformed packet header
920    ///   * the packet following this header exceeds the client's maximum packet size
921    pub async fn poll_header(&mut self) -> Result<FixedHeader, MqttError<'c>> {
922        let header = self.raw.recv_header().await?;
923
924        if let Ok(p) = header.packet_type() {
925            debug!(
926                "received {:?} packet header (remaining length: {})",
927                p,
928                header.remaining_len.value()
929            );
930        } else {
931            error!("received invalid header {:?}", header);
932            self.raw.close_with(Some(ReasonCode::MalformedPacket));
933            return Err(MqttError::Server);
934        }
935
936        if header.remaining_len.value() > self.client_config.maximum_accepted_remaining_length {
937            error!(
938                "received a packet exceeding maximum packet size, remaining length={:?}",
939                header.remaining_len.value()
940            );
941            self.raw.close_with(Some(ReasonCode::PacketTooLarge));
942            return Err(MqttError::Server);
943        }
944
945        Ok(header)
946    }
947
948    /// Polls the network for the variable header and payload of a packet. Not cancel-safe.
949    ///
950    /// # Preconditions:
951    /// - The [`FixedHeader`] argument was received from the network right before.
952    /// - The client did not return a non-recoverable [`MqttError`] before
953    ///
954    /// # Returns:
955    /// MQTT Events for regular communication. Their further meaning is documented in [`Event`].
956    ///
957    /// # Errors
958    ///
959    /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
960    /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
961    /// * [`MqttError::Alloc`] if the underlying [`BufferProvider`] returned an error
962    /// * [`MqttError::Server`] if:
963    ///   * the server sends a malformed packet
964    ///   * the server causes a protocol error
965    ///   * the packet following this header exceeds the client's maximum packet size
966    ///   * the server sends a PUBLISH packet with an invalid topic alias
967    ///   * the server exceeded the client's receive maximum with a new [`QoS`] 2 PUBLISH
968    ///   * the server sends a PUBACK/PUBREC/PUBREL/PUBCOMP packet which mismatches what
969    ///     the client expects for this packet identifier from its session state
970    ///   * the fixed header has the packet type CONNECT/SUBSCRIBE/UNSUBSCRIBE/PINGREQ
971    /// * [`MqttError::Disconnect`] if a DISCONNECT packet is received
972    /// * [`MqttError::AuthPacketReceived`] if the fixed header has the packet type AUTH
973    pub async fn poll_body(
974        &mut self,
975        header: FixedHeader,
976    ) -> Result<Event<'c, MAX_SUBSCRIPTION_IDENTIFIERS>, MqttError<'c>> {
977        let event = match header.packet_type()? {
978            PacketType::Pingresp => {
979                self.raw.recv_body::<PingrespPacket>(&header).await?;
980                Event::Pingresp
981            }
982            PacketType::Suback => {
983                // We only send SUBSCRIBE packets with exactly 1 topic
984                // -> Packets with more than 1 reason code are currently rejected by the RxPacket::receive implementation
985                //    with RxError::Protocol error. This is correct as long as we only send SUBSCRIBE packets with 1 topic.
986                let suback = self.raw.recv_body::<SubackPacket<'_, 1>>(&header).await?;
987                let pid = suback.packet_identifier;
988
989                if Self::remove_packet_identifier_if_exists(&mut self.pending_suback, pid) {
990                    // We only send SUBSCRIBE packets with exactly 1 topic
991                    if suback.reason_codes.len() != 1 {
992                        error!("received mismatched SUBACK");
993                        self.raw.close_with(Some(ReasonCode::ProtocolError));
994                        return Err(MqttError::Server);
995                    }
996
997                    let r = suback.reason_codes.first().unwrap();
998
999                    Event::Suback(Suback {
1000                        packet_identifier: pid,
1001                        reason_code: *r,
1002                    })
1003                } else {
1004                    debug!("packet identifier {} in SUBACK not in use", pid);
1005                    Event::Ignored
1006                }
1007            }
1008            PacketType::Unsuback => {
1009                // We only send UNSUBSCRIBE packets with exactly 1 topic
1010                // -> Packets with more than 1 reason code are currently rejected by the RxPacket::receive implementation
1011                //    with RxError::Protocol error. This is correct as long as we only send UNSUBSCRIBE packets with 1 topic.
1012                let unsuback = self.raw.recv_body::<UnsubackPacket<'_, 1>>(&header).await?;
1013                let pid = unsuback.packet_identifier;
1014
1015                if Self::remove_packet_identifier_if_exists(&mut self.pending_unsuback, pid) {
1016                    // We only send UNSUBSCRIBE packets with exactly 1 topic
1017                    if unsuback.reason_codes.len() != 1 {
1018                        error!("received mismatched UNSUBACK");
1019                        self.raw.close_with(Some(ReasonCode::ProtocolError));
1020                        return Err(MqttError::Server);
1021                    }
1022
1023                    let r = unsuback.reason_codes.first().unwrap();
1024
1025                    Event::Unsuback(Suback {
1026                        packet_identifier: pid,
1027                        reason_code: *r,
1028                    })
1029                } else {
1030                    debug!("packet identifier {} in UNSUBACK not in use", pid);
1031                    Event::Ignored
1032                }
1033            }
1034            PacketType::Publish => {
1035                let publish = self
1036                    .raw
1037                    .recv_body::<PublishPacket<'_, MAX_SUBSCRIPTION_IDENTIFIERS>>(&header)
1038                    .await?;
1039
1040                // Our topic alias maximum is always 0, the moment we receive a topic alias, this is an error.
1041                let TopicReference::Name(topic) = publish.topic else {
1042                    error!("received disallowed topic alias");
1043                    self.raw.close_with(Some(ReasonCode::TopicAliasInvalid));
1044                    return Err(MqttError::Server);
1045                };
1046
1047                let publish = Publish {
1048                    dup: publish.dup,
1049                    identified_qos: publish.identified_qos,
1050                    retain: publish.retain,
1051                    topic,
1052                    payload_format_indicator: publish
1053                        .payload_format_indicator
1054                        .map(Property::into_inner),
1055                    message_expiry_interval: publish
1056                        .message_expiry_interval
1057                        .map(Property::into_inner),
1058                    response_topic: publish.response_topic.map(Property::into_inner),
1059                    correlation_data: publish.correlation_data.map(Property::into_inner),
1060                    subscription_identifiers: publish
1061                        .subscription_identifiers
1062                        .into_iter()
1063                        .map(Property::into_inner)
1064                        .collect(),
1065                    content_type: publish.content_type.map(Property::into_inner),
1066                    message: publish.message,
1067                };
1068
1069                match publish.identified_qos {
1070                    IdentifiedQoS::AtMostOnce => {
1071                        debug!("received QoS 0 publication");
1072
1073                        Event::Publish(publish)
1074                    }
1075                    IdentifiedQoS::AtLeastOnce(pid) => {
1076                        debug!("received QoS 1 publication with packet identifier {}", pid);
1077
1078                        // We could disconnect here using ReasonCode::ReceiveMaximumExceeded, but incoming QoS 1 publications
1079                        // don't require resources outside of this scope which means we can just accept these packets.
1080
1081                        let puback = PubackPacket::new(pid, ReasonCode::Success);
1082
1083                        debug!("sending PUBACK packet");
1084
1085                        // Don't check whether length exceeds servers maximum packet size because we don't
1086                        // add properties to PUBACK packets -> length is always minimal at 6 bytes.
1087                        // The server really shouldn't reject this.
1088                        self.raw.send(&puback).await?;
1089                        self.raw.flush().await?;
1090
1091                        Event::Publish(publish)
1092                    }
1093                    IdentifiedQoS::ExactlyOnce(pid) => {
1094                        debug!("received QoS 2 publication with packet identifier {}", pid);
1095
1096                        let event = match self.session.spublish_flight_state(pid) {
1097                            Some(SPublishFlightState::AwaitingPubrel) => Event::Duplicate,
1098                            None if self.session.spublish_remaining_capacity() > 0 => {
1099                                // Safety: `spublish_remaining_capacity()` > 0 confirms that there is space.
1100                                unsafe { self.session.await_pubrel(pid) };
1101                                Event::Publish(publish)
1102                            }
1103                            None => {
1104                                error!("server exceeded receive maximum");
1105                                self.raw
1106                                    .close_with(Some(ReasonCode::ReceiveMaximumExceeded));
1107                                return Err(MqttError::Server);
1108                            }
1109                        };
1110
1111                        let pubrec = PubrecPacket::new(pid, ReasonCode::Success);
1112
1113                        debug!("sending PUBREC packet");
1114
1115                        // Don't check whether length exceeds servers maximum packet size because we don't
1116                        // add properties to PUBREC packets -> length is always minimal at 6 bytes.
1117                        // The server really shouldn't reject this.
1118                        self.raw.send(&pubrec).await?;
1119                        self.raw.flush().await?;
1120
1121                        event
1122                    }
1123                }
1124            }
1125            PacketType::Puback => {
1126                let puback = self.raw.recv_body::<PubackPacket>(&header).await?;
1127                let pid = puback.packet_identifier;
1128                let reason_code = puback.reason_code;
1129
1130                match self.session.remove_cpublish(pid) {
1131                    Some(CPublishFlightState::AwaitingPuback) if reason_code.is_success() => {
1132                        debug!("publication with packet identifier {} complete", pid);
1133
1134                        Event::PublishAcknowledged(Puback {
1135                            packet_identifier: pid,
1136                            reason_code,
1137                        })
1138                    }
1139                    Some(CPublishFlightState::AwaitingPuback) => {
1140                        debug!("publication with packet identifier {} aborted", pid);
1141
1142                        Event::PublishRejected(Pubrej {
1143                            packet_identifier: pid,
1144                            reason_code,
1145                        })
1146                    }
1147                    Some(
1148                        s @ CPublishFlightState::AwaitingPubrec
1149                        | s @ CPublishFlightState::AwaitingPubcomp,
1150                    ) => {
1151                        warn!("packet identifier {} in PUBACK is actually {:?}", pid, s);
1152
1153                        // Readd this packet identifier to the session so that it can be republished
1154                        // after reconnecting.
1155
1156                        // Safety: Session::remove_cpublish returning Some and therefore successfully
1157                        // removing a cpublish frees space to add a new in flight entry.
1158                        unsafe { self.session.r#await(pid, s) };
1159
1160                        error!("received mismatched PUBACK");
1161                        self.raw.close_with(Some(ReasonCode::ProtocolError));
1162                        return Err(MqttError::Server);
1163                    }
1164                    None => {
1165                        debug!("packet identifier {} in PUBACK not in use", pid);
1166                        Event::Ignored
1167                    }
1168                }
1169            }
1170            PacketType::Pubrec => {
1171                let pubrec = self.raw.recv_body::<PubrecPacket>(&header).await?;
1172                let pid = pubrec.packet_identifier;
1173                let reason_code = pubrec.reason_code;
1174
1175                match self.session.remove_cpublish(pid) {
1176                    Some(CPublishFlightState::AwaitingPubrec) if reason_code.is_success() => {
1177                        // Safety: Session::remove_cpublish returning Some and therefore successfully
1178                        // removing a cpublish frees space to add a new in flight entry.
1179                        unsafe { self.session.await_pubcomp(pid) };
1180
1181                        let pubrel = PubrelPacket::new(pid, ReasonCode::Success);
1182
1183                        debug!("sending PUBREL packet");
1184
1185                        // Don't check whether length exceeds servers maximum packet size because we don't
1186                        // add properties to PUBREL packets -> length is always minimal at 6 bytes.
1187                        // The server really shouldn't reject this.
1188                        self.raw.send(&pubrel).await?;
1189                        self.raw.flush().await?;
1190
1191                        Event::PublishReceived(Puback {
1192                            packet_identifier: pid,
1193                            reason_code,
1194                        })
1195                    }
1196                    Some(CPublishFlightState::AwaitingPubrec) => {
1197                        // After receiving an erroneous PUBREC, we have to treat any subsequent PUBLISH packet
1198                        // with the same packet identifier as a new message. This is achieved by already having
1199                        // removed the packet identifier's in flight entry.
1200
1201                        debug!("publication with packet identifier {} aborted", pid);
1202
1203                        Event::PublishRejected(Pubrej {
1204                            packet_identifier: pid,
1205                            reason_code,
1206                        })
1207                    }
1208                    Some(
1209                        s @ CPublishFlightState::AwaitingPuback
1210                        | s @ CPublishFlightState::AwaitingPubcomp,
1211                    ) => {
1212                        warn!("packet identifier {} in PUBREC is actually {:?}", pid, s);
1213
1214                        // Readd this packet identifier to the session so that it can be republished
1215                        // after reconnecting.
1216
1217                        // Safety: Session::remove_cpublish returning Some and therefore successfully
1218                        // removing a cpublish frees space to add a new in flight entry.
1219                        unsafe { self.session.r#await(pid, s) };
1220
1221                        error!("received mismatched PUBREC");
1222                        self.raw.close_with(Some(ReasonCode::ProtocolError));
1223                        return Err(MqttError::Server);
1224                    }
1225                    None => {
1226                        debug!("packet identifier {} in PUBREC not in use", pid);
1227
1228                        let pubrel = PubrelPacket::new(pid, ReasonCode::PacketIdentifierNotFound);
1229
1230                        debug!("sending PUBREL packet");
1231
1232                        // Don't check whether length exceeds servers maximum packet size because we don't
1233                        // add properties to PUBREL packets -> length is always minimal at 6 bytes.
1234                        // The server really shouldn't reject this.
1235                        self.raw.send(&pubrel).await?;
1236                        self.raw.flush().await?;
1237
1238                        Event::Ignored
1239                    }
1240                }
1241            }
1242            PacketType::Pubrel => {
1243                let pubrel = self.raw.recv_body::<PubrelPacket>(&header).await?;
1244                let pid = pubrel.packet_identifier;
1245                let reason_code = pubrel.reason_code;
1246
1247                match self.session.remove_spublish(pid) {
1248                    Some(SPublishFlightState::AwaitingPubrel) if reason_code.is_success() => {
1249                        let pubcomp = PubcompPacket::new(pid, ReasonCode::Success);
1250
1251                        debug!("sending PUBCOMP packet");
1252
1253                        // Don't check whether length exceeds servers maximum packet size because we don't
1254                        // add properties to PUBCOMP packets -> length is always minimal at 6 bytes.
1255                        // The server really shouldn't reject this.
1256                        self.raw.send(&pubcomp).await?;
1257                        self.raw.flush().await?;
1258
1259                        Event::PublishReleased(Puback {
1260                            packet_identifier: pid,
1261                            reason_code,
1262                        })
1263                    }
1264                    Some(SPublishFlightState::AwaitingPubrel) => {
1265                        debug!("publication with packet identifier {} aborted", pid);
1266
1267                        Event::PublishRejected(Pubrej {
1268                            packet_identifier: pid,
1269                            reason_code,
1270                        })
1271                    }
1272                    None => {
1273                        debug!("packet identifier {} in PUBREL not in use", pid);
1274
1275                        let pubcomp = PubcompPacket::new(pid, ReasonCode::PacketIdentifierNotFound);
1276
1277                        debug!("sending PUBCOMP packet");
1278
1279                        // Don't check whether length exceeds servers maximum packet size because we don't
1280                        // add properties to PUBCOMP packets -> length is always minimal at 6 bytes.
1281                        // The server really shouldn't reject this.
1282                        self.raw.send(&pubcomp).await?;
1283                        self.raw.flush().await?;
1284
1285                        Event::Ignored
1286                    }
1287                }
1288            }
1289            PacketType::Pubcomp => {
1290                let pubcomp = self.raw.recv_body::<PubcompPacket>(&header).await?;
1291                let pid = pubcomp.packet_identifier;
1292                let reason_code = pubcomp.reason_code;
1293
1294                match self.session.remove_cpublish(pid) {
1295                    Some(CPublishFlightState::AwaitingPubcomp) if reason_code.is_success() => {
1296                        debug!("publication with packet identifier {} complete", pid);
1297
1298                        Event::PublishComplete(Puback {
1299                            packet_identifier: pid,
1300                            reason_code: pubcomp.reason_code,
1301                        })
1302                    }
1303                    Some(CPublishFlightState::AwaitingPubcomp) => {
1304                        debug!("publication with packet identifier {} aborted", pid);
1305
1306                        Event::PublishRejected(Pubrej {
1307                            packet_identifier: pid,
1308                            reason_code,
1309                        })
1310                    }
1311                    Some(
1312                        s @ CPublishFlightState::AwaitingPuback
1313                        | s @ CPublishFlightState::AwaitingPubrec,
1314                    ) => {
1315                        warn!("packet identifier {} in PUBCOMP is actually {:?}", pid, s);
1316
1317                        // Readd this packet identifier to the session so that it can be republished
1318                        // after reconnecting.
1319
1320                        // Safety: Session::remove_cpublish returning Some and therefore successfully
1321                        // removing a cpublish frees space to add a new in flight entry.
1322                        unsafe { self.session.r#await(pid, s) };
1323
1324                        error!("received mismatched PUBCOMP");
1325                        self.raw.close_with(Some(ReasonCode::ProtocolError));
1326                        return Err(MqttError::Server);
1327                    }
1328                    None => {
1329                        debug!("packet identifier {} in PUBCOMP not in use", pid);
1330                        Event::Ignored
1331                    }
1332                }
1333            }
1334            PacketType::Disconnect => {
1335                let disconnect = self.raw.recv_body::<DisconnectPacket>(&header).await?;
1336
1337                return Err(MqttError::Disconnect {
1338                    reason: disconnect.reason_code,
1339                    reason_string: disconnect.reason_string.map(Property::into_inner),
1340                    server_reference: disconnect.server_reference.map(Property::into_inner),
1341                });
1342            }
1343            t @ (PacketType::Connect
1344            | PacketType::Subscribe
1345            | PacketType::Unsubscribe
1346            | PacketType::Pingreq) => {
1347                error!(
1348                    "received a packet that the server is not allowed to send: {:?}",
1349                    t
1350                );
1351
1352                self.raw.close_with(Some(ReasonCode::ProtocolError));
1353                return Err(MqttError::Server);
1354            }
1355            PacketType::Connack => {
1356                error!("received unexpected CONNACK packet");
1357
1358                self.raw.close_with(Some(ReasonCode::ProtocolError));
1359                return Err(MqttError::Server);
1360            }
1361            PacketType::Auth => {
1362                error!("received unexpected AUTH packet");
1363
1364                // Receiving a AUTH packet is currently always a protocol error because we never send
1365                // an Authentication Method property in the CONNECT packet.
1366                // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901217>
1367                self.raw.close_with(Some(ReasonCode::ProtocolError));
1368                return Err(MqttError::AuthPacketReceived);
1369            }
1370        };
1371
1372        Ok(event)
1373    }
1374}