rust_mqtt/client/mod.rs
1//! Implements full client functionality with session and configuration handling and Quality of Service flows.
2
3use core::num::NonZero;
4
5use heapless::Vec;
6
7use crate::{
8 buffer::BufferProvider,
9 bytes::Bytes,
10 client::{
11 event::{Event, Puback, Publish, Pubrej, Suback},
12 info::ConnectInfo,
13 options::{
14 ConnectOptions, DisconnectOptions, PublicationOptions, SubscriptionOptions,
15 TopicReference,
16 },
17 raw::Raw,
18 },
19 config::{ClientConfig, MaximumPacketSize, ServerConfig, SessionExpiryInterval, SharedConfig},
20 fmt::{assert, debug, error, info, panic, trace, unreachable, warn},
21 header::{FixedHeader, PacketType},
22 io::Transport,
23 packet::{Packet, TxPacket},
24 session::{CPublishFlightState, SPublishFlightState, Session},
25 types::{
26 IdentifiedQoS, MqttBinary, MqttString, PacketIdentifier, QoS, ReasonCode,
27 SubscriptionFilter, TopicFilter, TopicName, VarByteInt,
28 },
29 v5::{
30 packet::{
31 ConnackPacket, ConnectPacket, DisconnectPacket, PingreqPacket, PingrespPacket,
32 PubackPacket, PubcompPacket, PublishPacket, PubrecPacket, PubrelPacket, SubackPacket,
33 SubscribePacket, UnsubackPacket, UnsubscribePacket,
34 },
35 property::Property,
36 },
37};
38
39mod err;
40
41pub mod event;
42pub mod info;
43pub mod options;
44pub mod raw;
45
46pub use err::Error as MqttError;
47
48/// An MQTT client.
49///
50/// Configuration via const parameters:
51/// - `MAX_SUBSCRIBES`: The maximum amount of in-flight/unacknowledged SUBSCRIBE packets (one per call to [`Self::subscribe`]).
52/// - `RECEIVE_MAXIMUM`: MQTT's control flow mechanism. The maximum amount of incoming [`QoS::AtLeastOnce`] and
53/// [`QoS::ExactlyOnce`] publications (accumulated). Must not be 0 and must not be greater than 65535.
54/// - `SEND_MAXIMUM`: The maximum amount of outgoing [`QoS::AtLeastOnce`] and [`QoS::ExactlyOnce`] publications. The server
55/// can further limit this with its receive maximum. The client will use the minimum of this value and [`Self::server_config`].
56/// - `MAX_SUBSCRIPTION_IDENTIFIERS`: The maximum amount of subscription identifier properties the client can receive within a
57/// single PUBLISH packet. If a packet with more subscription identifiers is received, the later identifers will be discarded.
58#[derive(Debug)]
59#[cfg_attr(feature = "defmt", derive(defmt::Format))]
60pub struct Client<
61 'c,
62 N: Transport,
63 B: BufferProvider<'c>,
64 const MAX_SUBSCRIBES: usize,
65 const RECEIVE_MAXIMUM: usize,
66 const SEND_MAXIMUM: usize,
67 const MAX_SUBSCRIPTION_IDENTIFIERS: usize,
68> {
69 client_config: ClientConfig,
70 shared_config: SharedConfig,
71 server_config: ServerConfig,
72 session: Session<RECEIVE_MAXIMUM, SEND_MAXIMUM>,
73
74 raw: Raw<'c, N, B>,
75
76 packet_identifier_counter: PacketIdentifier,
77
78 /// sent SUBSCRIBE packets
79 pending_suback: Vec<PacketIdentifier, MAX_SUBSCRIBES>,
80 /// sent UNSUBSCRIBE packets
81 pending_unsuback: Vec<PacketIdentifier, MAX_SUBSCRIBES>,
82}
83
84impl<
85 'c,
86 N: Transport,
87 B: BufferProvider<'c>,
88 const MAX_SUBSCRIBES: usize,
89 const RECEIVE_MAXIMUM: usize,
90 const SEND_MAXIMUM: usize,
91 const MAX_SUBSCRIPTION_IDENTIFIERS: usize,
92> Client<'c, N, B, MAX_SUBSCRIBES, RECEIVE_MAXIMUM, SEND_MAXIMUM, MAX_SUBSCRIPTION_IDENTIFIERS>
93{
94 /// Creates a new, disconnected MQTT client using a buffer provider to store
95 /// dynamically sized fields of received packets.
96 /// The session state is initialised as a new session. If you want to start the
97 /// client with an existing session, use [`Self::with_session`].
98 pub fn new(buffer: &'c mut B) -> Self {
99 assert!(
100 RECEIVE_MAXIMUM <= 65535,
101 "RECEIVE_MAXIMUM must be less than or equal to 65535"
102 );
103 assert!(
104 RECEIVE_MAXIMUM > 0,
105 "RECEIVE_MAXIMUM must be greater than 0"
106 );
107
108 Self {
109 client_config: ClientConfig::default(),
110 shared_config: SharedConfig::default(),
111 server_config: ServerConfig::default(),
112 session: Session::default(),
113
114 raw: Raw::new_disconnected(buffer),
115
116 packet_identifier_counter: PacketIdentifier::ONE,
117
118 pending_suback: Vec::new(),
119 pending_unsuback: Vec::new(),
120 }
121 }
122
123 /// Creates a new, disconnected MQTT client using a buffer provider to store
124 /// dynamically sized fields of received packets.
125 pub fn with_session(
126 session: Session<RECEIVE_MAXIMUM, SEND_MAXIMUM>,
127 buffer: &'c mut B,
128 ) -> Self {
129 let mut s = Self::new(buffer);
130 s.session = session;
131 s
132 }
133
134 /// Returns the amount of publications the client is allowed to make according to the server's
135 /// receive maximum. Does not account local space for storing publication state.
136 fn remaining_send_quota(&self) -> u16 {
137 self.server_config.receive_maximum.into_inner().get() - self.session.in_flight_cpublishes()
138 }
139
140 fn is_packet_identifier_used(&self, packet_identifier: PacketIdentifier) -> bool {
141 self.session
142 .is_used_cpublish_packet_identifier(packet_identifier)
143 || self.pending_suback.contains(&packet_identifier)
144 || self.pending_unsuback.contains(&packet_identifier)
145 }
146
147 /// Returns configuration for this client.
148 #[inline]
149 pub fn client_config(&self) -> &ClientConfig {
150 &self.client_config
151 }
152
153 /// Returns the configuration of the currently or last connected server if there is one.
154 #[inline]
155 pub fn server_config(&self) -> &ServerConfig {
156 &self.server_config
157 }
158
159 /// Returns the configuration negotiated between the client and server.
160 #[inline]
161 pub fn shared_config(&self) -> &SharedConfig {
162 &self.shared_config
163 }
164
165 /// Returns session related configuration and tracking information.
166 #[inline]
167 pub fn session(&self) -> &Session<RECEIVE_MAXIMUM, SEND_MAXIMUM> {
168 &self.session
169 }
170
171 /// Returns an immutable reference to the supplied [`BufferProvider`] implementation.
172 #[inline]
173 pub fn buffer(&self) -> &B {
174 self.raw.buffer()
175 }
176
177 /// Returns a mutable reference to the supplied [`BufferProvider`] implementation.
178 ///
179 /// This can for example be used to reset the underlying buffer if using `BumpBuffer`.
180 #[inline]
181 pub fn buffer_mut(&mut self) -> &mut B {
182 self.raw.buffer_mut()
183 }
184
185 /// Generates a new packet identifier.
186 fn packet_identifier(&mut self) -> PacketIdentifier {
187 loop {
188 let packet_identifier = self.packet_identifier_counter;
189
190 self.packet_identifier_counter = packet_identifier.next();
191
192 if !self.is_packet_identifier_used(packet_identifier) {
193 break packet_identifier;
194 }
195 }
196 }
197
198 /// Returns true if the packet identifier exists.
199 fn remove_packet_identifier_if_exists<const M: usize>(
200 vec: &mut Vec<PacketIdentifier, M>,
201 pid: PacketIdentifier,
202 ) -> bool {
203 if let Some(i) = vec.iter().position(|p| *p == pid) {
204 // Safety: The index has just been found in the vector
205 unsafe { vec.swap_remove_unchecked(i) };
206 true
207 } else {
208 false
209 }
210 }
211
212 /// Connect the client to an MQTT server on the other end of the `net` argument.
213 /// Sends a CONNECT message and awaits the CONNACK response by the server.
214 ///
215 /// Only call this when
216 /// - the client is newly constructed.
217 /// - a non-recoverable error has occured and [`Self::abort`] has been called.
218 /// - [`Self::disconnect`] has been called.
219 ///
220 /// The session expiry interval in [`ConnectOptions`] overrides the one in the session of the client.
221 ///
222 /// Configuration that was negotiated with the server is stored in the `client_config`,
223 /// `server_config`, `shared_config`, and `session` fields, which have getters
224 /// ([`Self::client_config`], [`Self::server_config`], [`Self::shared_config`],
225 /// [`Self::session`]).
226 ///
227 /// If the server does not have a session present, the client's session is cleared. In case you would want
228 /// to keep the session state, you can call [`Self::session`] and clone the session before.
229 ///
230 /// # Returns:
231 /// Information about the session/connection that the client does currently not use and therefore not store
232 /// in its configuration fields.
233 ///
234 /// # Errors
235 ///
236 /// * [`MqttError::Server`] if:
237 /// * the server sends a malformed packet
238 /// * the first received packet is something other than a CONNACK packet
239 /// * `client_identifier` is [`None`] and the server did not assign a client identifier
240 /// * the server causes a protocol error
241 /// * [`MqttError::Disconnect`] if the CONNACK packet's reason code is not successful (>= 0x80)
242 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
243 /// * [`MqttError::Alloc`] if the underlying [`BufferProvider`] returned an error
244 pub async fn connect<'d>(
245 &mut self,
246 net: N,
247 options: &ConnectOptions<'_>,
248 client_identifier: Option<MqttString<'d>>,
249 ) -> Result<ConnectInfo<'d>, MqttError<'c>>
250 where
251 'c: 'd,
252 {
253 if options.clean_start {
254 self.session.clear();
255 }
256
257 self.pending_suback.clear();
258 self.pending_unsuback.clear();
259
260 self.raw.set_net(net);
261
262 // Set client session expiry interval because it is relevant to determine
263 // which session expiry interval can be sent in DISCONNECT packet.
264 self.client_config.session_expiry_interval = options.session_expiry_interval;
265
266 // Empirical maximum packet size mapping
267 // -------------------------------------------------------------------------------------------------------
268 // remaining length | fixed header length | max packet size
269 // 0..=127 | 2 | 2..=129
270 // 128..=16_383 | 3 | 131..=16_386
271 // 16_384..=2_097_151 | 4 | 16_388..=2_097_155
272 // 2_097_152..=VarByteInt::MAX_ENCODABLE | 5 | 2_097_157..=(VarByteInt::MAX_ENCODABLE+5)
273
274 const MAX_POSSIBLE_PACKET_SIZE: u32 = VarByteInt::MAX_ENCODABLE + 5;
275
276 self.client_config.maximum_accepted_remaining_length = match options.maximum_packet_size {
277 MaximumPacketSize::Unlimited => u32::MAX,
278 MaximumPacketSize::Limit(l) => match l.get() {
279 0 => unreachable!("NonZero invariant"),
280 1 => panic!(
281 "Every MQTT packet is at least 2 bytes long, a smaller maximum packet size makes no sense"
282 ),
283 2..=129 => l.get() - 2,
284 130..=16_386 => l.get() - 3,
285 16_387..=2_097_155 => l.get() - 4,
286 2_097_156..MAX_POSSIBLE_PACKET_SIZE => l.get() - 5,
287 MAX_POSSIBLE_PACKET_SIZE.. => VarByteInt::MAX_ENCODABLE,
288 },
289 };
290
291 trace!(
292 "maximum accepted remaining length set to {:?}",
293 self.client_config.maximum_accepted_remaining_length
294 );
295
296 {
297 let packet_client_identifier = client_identifier
298 .as_ref()
299 .map(MqttString::as_borrowed)
300 .unwrap_or_default();
301
302 let mut packet = ConnectPacket::new(
303 packet_client_identifier,
304 options.clean_start,
305 options.keep_alive,
306 options.maximum_packet_size,
307 options.session_expiry_interval,
308 // Safety: `Self::new` panics if `RECEIVE_MAXIMUM` is 0. Thus, this
309 // code is only reached when `RECEIVE_MAXIMUM` is greater than 0.
310 unsafe { NonZero::new_unchecked(RECEIVE_MAXIMUM as u16) },
311 options.request_response_information,
312 );
313
314 if let Some(ref user_name) = options.user_name {
315 packet.add_user_name(user_name.as_borrowed());
316 }
317 if let Some(ref password) = options.password {
318 packet.add_password(password.as_borrowed());
319 }
320
321 if let Some(ref will) = options.will {
322 let will_qos = will.will_qos;
323 let will_retain = will.will_retain;
324
325 packet.add_will(will.as_borrowed_will(), will_qos, will_retain);
326 }
327
328 debug!("sending CONNECT packet");
329 self.raw.send(&packet).await?;
330 self.raw.flush().await?;
331 }
332
333 let header = self.raw.recv_header().await?;
334
335 match header.packet_type() {
336 Ok(ConnackPacket::PACKET_TYPE) => debug!(
337 "received CONNACK packet header (remaining length: {})",
338 header.remaining_len.value()
339 ),
340 Ok(t) => {
341 error!("received unexpected {:?} packet header", t);
342
343 self.raw.close_with(Some(ReasonCode::ProtocolError));
344 return Err(MqttError::Server);
345 }
346 Err(_) => {
347 error!("received invalid header {:?}", header);
348 self.raw.close_with(Some(ReasonCode::MalformedPacket));
349 return Err(MqttError::Server);
350 }
351 }
352
353 let ConnackPacket {
354 session_present,
355 reason_code,
356 session_expiry_interval,
357 receive_maximum,
358 maximum_qos,
359 retain_available,
360 maximum_packet_size,
361 assigned_client_identifier,
362 topic_alias_maximum,
363 reason_string,
364 wildcard_subscription_available,
365 subscription_identifier_available,
366 shared_subscription_available,
367 server_keep_alive,
368 response_information,
369 server_reference,
370 } = self.raw.recv_body(&header).await?;
371
372 if reason_code.is_success() {
373 debug!("CONNACK packet indicates success");
374
375 if !session_present && !options.clean_start {
376 info!("server does not have the requested session present.");
377 self.session.clear();
378 }
379
380 let client_identifier = assigned_client_identifier
381 .map(Property::into_inner)
382 .or(client_identifier)
383 .ok_or_else(|| {
384 error!("server did not assign a client identifier when it was required.");
385 self.raw.close_with(Some(ReasonCode::ProtocolError));
386 MqttError::Server
387 })?;
388
389 self.shared_config.session_expiry_interval =
390 session_expiry_interval.unwrap_or(options.session_expiry_interval);
391 self.shared_config.keep_alive =
392 server_keep_alive.map_or(options.keep_alive, Property::into_inner);
393
394 if let Some(r) = receive_maximum {
395 self.server_config.receive_maximum = r;
396 }
397 if let Some(m) = maximum_qos {
398 self.server_config.maximum_qos = m.into_inner();
399 }
400 if let Some(r) = retain_available {
401 self.server_config.retain_supported = r.into_inner();
402 }
403 if let Some(m) = maximum_packet_size {
404 self.server_config.maximum_packet_size = m;
405 }
406 if let Some(t) = topic_alias_maximum {
407 self.server_config.topic_alias_maximum = t.into_inner();
408 }
409 if let Some(w) = wildcard_subscription_available {
410 self.server_config.wildcard_subscription_supported = w.into_inner();
411 }
412 if let Some(s) = subscription_identifier_available {
413 self.server_config.subscription_identifiers_supported = s.into_inner();
414 }
415 if let Some(s) = shared_subscription_available {
416 self.server_config.shared_subscription_supported = s.into_inner();
417 }
418
419 info!("connected to server (session present: {})", session_present);
420
421 Ok(ConnectInfo {
422 session_present,
423 client_identifier,
424 response_information: response_information.map(Property::into_inner),
425 server_reference: server_reference.map(Property::into_inner),
426 })
427 } else {
428 debug!("CONNACK packet indicates rejection");
429 info!("connection rejected by server (reason: {:?})", reason_code);
430
431 self.raw.close_with(None);
432
433 info!("disconnected from server");
434
435 Err(MqttError::Disconnect {
436 reason: reason_code,
437 reason_string: reason_string.map(Property::into_inner),
438 server_reference: server_reference.map(Property::into_inner),
439 })
440 }
441 }
442
443 /// Start a ping handshake by sending a PINGRESP packet.
444 ///
445 /// # Errors
446 ///
447 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
448 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
449 pub async fn ping(&mut self) -> Result<(), MqttError<'c>> {
450 debug!("sending PINGREQ packet");
451
452 // PINGREQ has length 2 which really shouldn't exceed server's max packet size.
453 // If it does the server should reconsider its incarnation as an MQTT server.
454 self.raw.send(&PingreqPacket::new()).await?;
455 self.raw.flush().await?;
456
457 Ok(())
458 }
459
460 /// Subscribes to a single topic with the given options.
461 ///
462 /// The client keeps track of the packet identifier sent in the SUBSCRIBE packet.
463 /// If no [`Event::Suback`] is received within a custom time,
464 /// this method can be used to send the SUBSCRIBE packet again.
465 ///
466 /// A subscription identifier should only be set if the server supports
467 /// subscription identifiers (Can be checked with [`Self::server_config`]).
468 /// The client does not double-check whether this feature is supported and will
469 /// always include the subscription identifier argument if present.
470 ///
471 /// # Returns:
472 /// The packet identifier of the sent SUBSCRIBE packet.
473 ///
474 /// # Errors
475 ///
476 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
477 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
478 /// * [`MqttError::SessionBuffer`] if the buffer for outgoing SUBSCRIBE packet identifiers is full
479 /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
480 /// exceeded by sending this SUBSCRIBE packet
481 pub async fn subscribe(
482 &mut self,
483 topic_filter: TopicFilter<'_>,
484 options: SubscriptionOptions,
485 ) -> Result<PacketIdentifier, MqttError<'c>> {
486 if self.pending_suback.len() == MAX_SUBSCRIBES {
487 info!("maximum concurrent subscriptions reached");
488 return Err(MqttError::SessionBuffer);
489 }
490
491 let subscribe_filter = SubscriptionFilter::new(topic_filter, &options);
492
493 let pid = self.packet_identifier();
494 let mut subscribe_filters = Vec::<_, 1>::new();
495 let _ = subscribe_filters.push(subscribe_filter);
496 let packet = SubscribePacket::new(pid, options.subscription_identifier, subscribe_filters)
497 .expect("SUBSCRIBE with a single topic can not exceed VarByteInt::MAX_ENCODABLE");
498
499 if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
500 return Err(MqttError::ServerMaximumPacketSizeExceeded);
501 }
502
503 debug!("sending SUBSCRIBE packet");
504
505 self.raw.send(&packet).await?;
506 self.raw.flush().await?;
507 self.pending_suback.push(pid).unwrap();
508
509 Ok(pid)
510 }
511
512 /// Unsubscribes from a single topic filter.
513 ///
514 /// The client keeps track of the packet identifier sent in the UNSUBSCRIBE packet.
515 /// If no [`Event::Unsuback`] is received within a custom time,
516 /// this method can be used to send the UNSUBSCRIBE packet again.
517 ///
518 /// # Returns:
519 /// The packet identifier of the sent UNSUBSCRIBE packet.
520 ///
521 /// # Errors
522 ///
523 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
524 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
525 /// * [`MqttError::SessionBuffer`] if the buffer for outgoing UNSUBSCRIBE packet identifiers is full
526 /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
527 /// exceeded by sending this UNSUBSCRIBE packet
528 pub async fn unsubscribe(
529 &mut self,
530 topic_filter: TopicFilter<'_>,
531 ) -> Result<PacketIdentifier, MqttError<'c>> {
532 if self.pending_unsuback.len() == MAX_SUBSCRIBES {
533 info!("maximum concurrent unsubscriptions reached");
534 return Err(MqttError::SessionBuffer);
535 }
536
537 let pid = self.packet_identifier();
538 let mut topic_filters = Vec::<_, 1>::new();
539 let _ = topic_filters.push(topic_filter);
540 let packet = UnsubscribePacket::new(pid, topic_filters)
541 .expect("UNSUBSCRIBE with a single topic cannot exceed VarByteInt::MAX_ENCODABLE");
542
543 if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
544 return Err(MqttError::ServerMaximumPacketSizeExceeded);
545 }
546
547 debug!("sending UNSUBSCRIBE packet");
548
549 self.raw.send(&packet).await?;
550 self.raw.flush().await?;
551 self.pending_unsuback.push(pid).unwrap();
552
553 Ok(pid)
554 }
555
556 /// Publish a message. If [`QoS`] is greater than 0, the packet identifier is also kept track of by the client
557 ///
558 /// # Returns:
559 /// - In case of [`QoS`] 0: [`None`]
560 /// - In case of [`QoS`] 1 or 2: [`Some`] with the packet identifier of the published packet
561 ///
562 /// # Errors
563 ///
564 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
565 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
566 /// * [`MqttError::SendQuotaExceeded`] if the server's control flow limit is reached and sending
567 /// the PUBLISH would exceed the limit causing a protocol error
568 /// * [`MqttError::SessionBuffer`] if the buffer for outgoing PUBLISH packet identifiers is full
569 /// * [`MqttError::InvalidTopicAlias`] if a topic alias is used and
570 /// * its value is 0
571 /// * its value is greater than the server's maximum topic alias
572 /// * [`MqttError::PacketMaximumLengthExceeded`] if the PUBLISH packet is too long to be encoded
573 /// with MQTT's [`VarByteInt`]
574 /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
575 /// exceeded by sending this PUBLISH packet
576 pub async fn publish(
577 &mut self,
578 options: &PublicationOptions<'_>,
579 message: Bytes<'_>,
580 ) -> Result<Option<PacketIdentifier>, MqttError<'c>> {
581 if options.qos > QoS::AtMostOnce {
582 if self.remaining_send_quota() == 0 {
583 info!("server receive maximum reached");
584 return Err(MqttError::SendQuotaExceeded);
585 }
586 if self.session.cpublish_remaining_capacity() == 0 {
587 info!("client maximum concurrent publications reached");
588 return Err(MqttError::SessionBuffer);
589 }
590 }
591
592 let identified_qos = match options.qos {
593 QoS::AtMostOnce => IdentifiedQoS::AtMostOnce,
594 QoS::AtLeastOnce => IdentifiedQoS::AtLeastOnce(self.packet_identifier()),
595 QoS::ExactlyOnce => IdentifiedQoS::ExactlyOnce(self.packet_identifier()),
596 };
597
598 if options
599 .topic
600 .alias()
601 .is_some_and(|a| !(1..=self.server_config.topic_alias_maximum).contains(&a))
602 {
603 return Err(MqttError::InvalidTopicAlias);
604 }
605
606 let packet: PublishPacket<'_, 0> = PublishPacket::new(
607 false,
608 identified_qos,
609 options.retain,
610 options.topic.as_borrowed(),
611 options.payload_format_indicator.map(Into::into),
612 options.message_expiry_interval.map(Into::into),
613 options.response_topic.as_ref().map(TopicName::as_borrowed),
614 options
615 .correlation_data
616 .as_ref()
617 .map(MqttBinary::as_borrowed),
618 options
619 .content_type
620 .as_ref()
621 .map(MqttString::as_borrowed)
622 .map(Into::into),
623 message,
624 )?;
625
626 if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
627 return Err(MqttError::ServerMaximumPacketSizeExceeded);
628 }
629
630 // Treat the packet as sent before successfully sending. In case of a network error,
631 // we have tracked the packet as in flight and can republish it.
632 let pid = match identified_qos {
633 IdentifiedQoS::AtMostOnce => None,
634 IdentifiedQoS::AtLeastOnce(packet_identifier) => Some({
635 // Safety: `cpublish_remaining_capacity()` > 0 confirms that there is space.
636 unsafe { self.session.await_puback(packet_identifier) };
637 packet_identifier
638 }),
639 IdentifiedQoS::ExactlyOnce(packet_identifier) => Some({
640 // Safety: `cpublish_remaining_capacity()` > 0 confirms that there is space.
641 unsafe { self.session.await_pubrec(packet_identifier) };
642 packet_identifier
643 }),
644 };
645
646 match identified_qos.packet_identifier() {
647 Some(pid) => debug!("sending PUBLISH packet with packet identifier {}", pid),
648 None => debug!("sending PUBLISH packet"),
649 }
650
651 self.raw.send(&packet).await?;
652 self.raw.flush().await?;
653
654 Ok(pid)
655 }
656
657 /// Resends a PUBLISH packet with DUP flag set.
658 ///
659 /// This method must be called and must only be called after a reconnection with clean start set to 0,
660 /// as resending packets at any other time is a protocol error.
661 /// (Compare [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), \[MQTT-4.4.0-1\]).
662 ///
663 /// For a packet to be resent:
664 /// - it must have a quality of service > 0
665 /// - its packet identifier must have an in flight entry with a quality of service matching the
666 /// quality of service in the options parameter
667 /// - in case of quality of service 2, it must not already be awaiting a PUBCOMP packet
668 ///
669 /// # Errors
670 ///
671 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
672 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
673 /// * [`MqttError::RepublishQoSNotMatching`] if the [`QoS`] of this republish does not match the
674 /// [`QoS`] that this packet identifier was originally published with
675 /// * [`MqttError::PacketIdentifierAwaitingPubcomp`] if a PUBREC packet with this packet identifier
676 /// has already been received and the server has therefore already received the PUBLISH
677 /// * [`MqttError::PacketIdentifierNotInFlight`] if this packet identifier is not tracked in the
678 /// client's session
679 /// * [`MqttError::InvalidTopicAlias`] if a topic alias is used and
680 /// * its value is 0
681 /// * its value is greater than the server's maximum topic alias
682 /// * [`MqttError::PacketMaximumLengthExceeded`] if the PUBLISH packet is too long to be encoded
683 /// with MQTT's [`VarByteInt`]
684 /// * [`MqttError::ServerMaximumPacketSizeExceeded`] if the server's maximum packet size would be
685 /// exceeded by sending this PUBLISH packet
686 ///
687 /// # Panics
688 ///
689 /// This function may panic if the [`QoS`] in the `options` is [`QoS::AtMostOnce`].
690 pub async fn republish(
691 &mut self,
692 packet_identifier: PacketIdentifier,
693 options: &PublicationOptions<'_>,
694 message: Bytes<'_>,
695 ) -> Result<(), MqttError<'c>> {
696 if options.qos == QoS::AtMostOnce {
697 panic!("QoS 0 packets cannot be republished");
698 }
699
700 let identified_qos = match self.session.cpublish_flight_state(packet_identifier) {
701 Some(CPublishFlightState::AwaitingPuback) if options.qos == QoS::AtLeastOnce => {
702 IdentifiedQoS::AtLeastOnce(packet_identifier)
703 }
704 Some(CPublishFlightState::AwaitingPubrec) if options.qos == QoS::ExactlyOnce => {
705 IdentifiedQoS::ExactlyOnce(packet_identifier)
706 }
707
708 Some(CPublishFlightState::AwaitingPuback) => {
709 warn!(
710 "packet identifier {} was originally published with QoS 1",
711 packet_identifier
712 );
713 return Err(MqttError::RepublishQoSNotMatching);
714 }
715 Some(CPublishFlightState::AwaitingPubrec) => {
716 warn!(
717 "packet identifier {} was originally published with QoS 2",
718 packet_identifier
719 );
720 return Err(MqttError::RepublishQoSNotMatching);
721 }
722 Some(CPublishFlightState::AwaitingPubcomp) => {
723 warn!(
724 "packet identifier {} is already awaiting PUBCOMP",
725 packet_identifier
726 );
727 return Err(MqttError::PacketIdentifierAwaitingPubcomp);
728 }
729 None => {
730 warn!("packet identifier {} not in flight", packet_identifier);
731 return Err(MqttError::PacketIdentifierNotInFlight);
732 }
733 };
734
735 if options
736 .topic
737 .alias()
738 .is_some_and(|a| !(1..=self.server_config.topic_alias_maximum).contains(&a))
739 {
740 return Err(MqttError::InvalidTopicAlias);
741 }
742
743 let packet: PublishPacket<'_, 0> = PublishPacket::new(
744 true,
745 identified_qos,
746 options.retain,
747 options.topic.as_borrowed(),
748 options.payload_format_indicator.map(Into::into),
749 options.message_expiry_interval.map(Into::into),
750 options.response_topic.as_ref().map(TopicName::as_borrowed),
751 options
752 .correlation_data
753 .as_ref()
754 .map(MqttBinary::as_borrowed),
755 options
756 .content_type
757 .as_ref()
758 .map(MqttString::as_borrowed)
759 .map(Into::into),
760 message,
761 )?;
762
763 if self.server_config.maximum_packet_size.as_u32() < packet.encoded_len() as u32 {
764 return Err(MqttError::ServerMaximumPacketSizeExceeded);
765 }
766
767 // We only republish a message if its quality of service and flight state is correct.
768 // In this case, we don't have to change its in flight tracking state as it already
769 // is in the desired state.
770
771 debug!(
772 "resending PUBLISH packet with packet identifier {}",
773 packet_identifier
774 );
775
776 self.raw.send(&packet).await?;
777 self.raw.flush().await?;
778
779 Ok(())
780 }
781
782 /// Resends all pending PUBREL packets.
783 ///
784 /// This method must be called and must only be called after a reconnection
785 /// with clean start set to 0, as resending packets at any other time is a protocol error.
786 /// (Compare [Message delivery retry](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901238), \[MQTT-4.4.0-1\]).
787 ///
788 /// This method assumes that the server's receive maximum after the reconnection is great enough
789 /// to handle as many publication flows as dragged between the two connections.
790 ///
791 /// # Errors
792 ///
793 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
794 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
795 pub async fn rerelease(&mut self) -> Result<(), MqttError<'c>> {
796 for packet_identifier in self
797 .session
798 .pending_client_publishes
799 .iter()
800 .filter(|s| s.state == CPublishFlightState::AwaitingPubcomp)
801 .map(|p| p.packet_identifier)
802 {
803 let pubrel = PubrelPacket::new(packet_identifier, ReasonCode::Success);
804
805 // Don't check whether length exceeds servers maximum packet size because we don't
806 // add properties to PUBREL packets -> length is always minimal at 6 bytes.
807 // The server really shouldn't reject this.
808 self.raw.send(&pubrel).await?;
809 }
810
811 self.raw.flush().await?;
812
813 Ok(())
814 }
815
816 /// Disconnects from the server after an error occured in a situation-aware way by either:
817 /// - dropping the connection
818 /// - sending a DISCONNECT with the deposited reason code and dropping the connection.
819 ///
820 /// After an MQTT communication fails, usually either the client or the server closes the connection.
821 ///
822 /// This is not cancel-safe but you can set a timeout if reconnecting later anyway or you don't reuse the client.
823 #[inline]
824 pub async fn abort(&mut self) {
825 match self.raw.abort().await {
826 Ok(()) => info!("connection aborted"),
827 Err(e) => warn!("connection abort failed: {:?}", e),
828 }
829 }
830
831 /// Disconnects gracefully from the server by sending a DISCONNECT packet.
832 ///
833 /// # Preconditions:
834 /// - The client did not return a non-recoverable Error before
835 ///
836 /// # Errors
837 ///
838 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
839 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
840 /// * [`MqttError::IllegalDisconnectSessionExpiryInterval`] if the session expiry interval in the
841 /// CONNECT packet was zero and the session expiry interval in the [`DisconnectOptions`] is [`Some`]
842 /// and not [`SessionExpiryInterval::EndOnDisconnect`].
843 pub async fn disconnect(&mut self, options: &DisconnectOptions) -> Result<(), MqttError<'c>> {
844 let connect_session_expiry_interval_was_zero =
845 self.client_config.session_expiry_interval == SessionExpiryInterval::EndOnDisconnect;
846 let disconnect_session_expiry_interval_is_non_zero = options
847 .session_expiry_interval
848 .is_some_and(|s| s != SessionExpiryInterval::EndOnDisconnect);
849
850 if connect_session_expiry_interval_was_zero
851 && disconnect_session_expiry_interval_is_non_zero
852 {
853 return Err(MqttError::IllegalDisconnectSessionExpiryInterval);
854 }
855
856 let reason_code = if options.publish_will {
857 ReasonCode::DisconnectWithWillMessage
858 } else {
859 ReasonCode::Success
860 };
861
862 let mut packet = DisconnectPacket::new(reason_code);
863 if let Some(s) = options.session_expiry_interval {
864 packet.add_session_expiry_interval(s);
865 }
866
867 debug!("sending DISCONNECT packet");
868
869 // Don't check whether length exceeds servers maximum packet size because we don't
870 // add a reason string to the DISCONNECT packet -> length is always in the 4..=9 range in bytes.
871 // The server really shouldn't reject this.
872 self.raw.send(&packet).await?;
873 self.raw.flush().await?;
874
875 // Terminates (closes) the connection by dropping it
876 self.raw.close_with(None);
877
878 info!("disconnected from server");
879
880 Ok(())
881 }
882
883 /// Combines [`Self::poll_header`] and [`Self::poll_body`].
884 ///
885 /// Polls the network for a full packet. Not cancel-safe.
886 ///
887 /// # Preconditions:
888 /// - The last MQTT packet was received completely
889 /// - The client did not return a non-recoverable Error before
890 ///
891 /// # Returns:
892 /// MQTT Events. Their further meaning is documented in [`Event`].
893 ///
894 /// # Errors
895 ///
896 /// Returns the errors that [`Client::poll_header`] and [`Client::poll_body`] return.
897 /// For further information view their docs.
898 pub async fn poll(&mut self) -> Result<Event<'c, MAX_SUBSCRIPTION_IDENTIFIERS>, MqttError<'c>> {
899 let header = self.poll_header().await?;
900 self.poll_body(header).await
901 }
902
903 /// Polls the network for a fixed header in a cancel-safe way.
904 ///
905 /// If a fixed header is received, the first 4 bits (packet type) are checked for correctness.
906 ///
907 /// # Preconditions:
908 /// - The last MQTT packet was received completely
909 /// - The client did not return a non-recoverable Error before
910 ///
911 /// # Returns:
912 /// The received fixed header with a valid packet type. It can be used to call [`Self::poll_body`].
913 ///
914 /// # Errors
915 ///
916 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
917 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
918 /// * [`MqttError::Server`] if:
919 /// * the server sends a malformed packet header
920 /// * the packet following this header exceeds the client's maximum packet size
921 pub async fn poll_header(&mut self) -> Result<FixedHeader, MqttError<'c>> {
922 let header = self.raw.recv_header().await?;
923
924 if let Ok(p) = header.packet_type() {
925 debug!(
926 "received {:?} packet header (remaining length: {})",
927 p,
928 header.remaining_len.value()
929 );
930 } else {
931 error!("received invalid header {:?}", header);
932 self.raw.close_with(Some(ReasonCode::MalformedPacket));
933 return Err(MqttError::Server);
934 }
935
936 if header.remaining_len.value() > self.client_config.maximum_accepted_remaining_length {
937 error!(
938 "received a packet exceeding maximum packet size, remaining length={:?}",
939 header.remaining_len.value()
940 );
941 self.raw.close_with(Some(ReasonCode::PacketTooLarge));
942 return Err(MqttError::Server);
943 }
944
945 Ok(header)
946 }
947
948 /// Polls the network for the variable header and payload of a packet. Not cancel-safe.
949 ///
950 /// # Preconditions:
951 /// - The [`FixedHeader`] argument was received from the network right before.
952 /// - The client did not return a non-recoverable [`MqttError`] before
953 ///
954 /// # Returns:
955 /// MQTT Events for regular communication. Their further meaning is documented in [`Event`].
956 ///
957 /// # Errors
958 ///
959 /// * [`MqttError::RecoveryRequired`] if an unrecoverable error occured previously
960 /// * [`MqttError::Network`] if the underlying [`Transport`] returned an error
961 /// * [`MqttError::Alloc`] if the underlying [`BufferProvider`] returned an error
962 /// * [`MqttError::Server`] if:
963 /// * the server sends a malformed packet
964 /// * the server causes a protocol error
965 /// * the packet following this header exceeds the client's maximum packet size
966 /// * the server sends a PUBLISH packet with an invalid topic alias
967 /// * the server exceeded the client's receive maximum with a new [`QoS`] 2 PUBLISH
968 /// * the server sends a PUBACK/PUBREC/PUBREL/PUBCOMP packet which mismatches what
969 /// the client expects for this packet identifier from its session state
970 /// * the fixed header has the packet type CONNECT/SUBSCRIBE/UNSUBSCRIBE/PINGREQ
971 /// * [`MqttError::Disconnect`] if a DISCONNECT packet is received
972 /// * [`MqttError::AuthPacketReceived`] if the fixed header has the packet type AUTH
973 pub async fn poll_body(
974 &mut self,
975 header: FixedHeader,
976 ) -> Result<Event<'c, MAX_SUBSCRIPTION_IDENTIFIERS>, MqttError<'c>> {
977 let event = match header.packet_type()? {
978 PacketType::Pingresp => {
979 self.raw.recv_body::<PingrespPacket>(&header).await?;
980 Event::Pingresp
981 }
982 PacketType::Suback => {
983 // We only send SUBSCRIBE packets with exactly 1 topic
984 // -> Packets with more than 1 reason code are currently rejected by the RxPacket::receive implementation
985 // with RxError::Protocol error. This is correct as long as we only send SUBSCRIBE packets with 1 topic.
986 let suback = self.raw.recv_body::<SubackPacket<'_, 1>>(&header).await?;
987 let pid = suback.packet_identifier;
988
989 if Self::remove_packet_identifier_if_exists(&mut self.pending_suback, pid) {
990 // We only send SUBSCRIBE packets with exactly 1 topic
991 if suback.reason_codes.len() != 1 {
992 error!("received mismatched SUBACK");
993 self.raw.close_with(Some(ReasonCode::ProtocolError));
994 return Err(MqttError::Server);
995 }
996
997 let r = suback.reason_codes.first().unwrap();
998
999 Event::Suback(Suback {
1000 packet_identifier: pid,
1001 reason_code: *r,
1002 })
1003 } else {
1004 debug!("packet identifier {} in SUBACK not in use", pid);
1005 Event::Ignored
1006 }
1007 }
1008 PacketType::Unsuback => {
1009 // We only send UNSUBSCRIBE packets with exactly 1 topic
1010 // -> Packets with more than 1 reason code are currently rejected by the RxPacket::receive implementation
1011 // with RxError::Protocol error. This is correct as long as we only send UNSUBSCRIBE packets with 1 topic.
1012 let unsuback = self.raw.recv_body::<UnsubackPacket<'_, 1>>(&header).await?;
1013 let pid = unsuback.packet_identifier;
1014
1015 if Self::remove_packet_identifier_if_exists(&mut self.pending_unsuback, pid) {
1016 // We only send UNSUBSCRIBE packets with exactly 1 topic
1017 if unsuback.reason_codes.len() != 1 {
1018 error!("received mismatched UNSUBACK");
1019 self.raw.close_with(Some(ReasonCode::ProtocolError));
1020 return Err(MqttError::Server);
1021 }
1022
1023 let r = unsuback.reason_codes.first().unwrap();
1024
1025 Event::Unsuback(Suback {
1026 packet_identifier: pid,
1027 reason_code: *r,
1028 })
1029 } else {
1030 debug!("packet identifier {} in UNSUBACK not in use", pid);
1031 Event::Ignored
1032 }
1033 }
1034 PacketType::Publish => {
1035 let publish = self
1036 .raw
1037 .recv_body::<PublishPacket<'_, MAX_SUBSCRIPTION_IDENTIFIERS>>(&header)
1038 .await?;
1039
1040 // Our topic alias maximum is always 0, the moment we receive a topic alias, this is an error.
1041 let TopicReference::Name(topic) = publish.topic else {
1042 error!("received disallowed topic alias");
1043 self.raw.close_with(Some(ReasonCode::TopicAliasInvalid));
1044 return Err(MqttError::Server);
1045 };
1046
1047 let publish = Publish {
1048 dup: publish.dup,
1049 identified_qos: publish.identified_qos,
1050 retain: publish.retain,
1051 topic,
1052 payload_format_indicator: publish
1053 .payload_format_indicator
1054 .map(Property::into_inner),
1055 message_expiry_interval: publish
1056 .message_expiry_interval
1057 .map(Property::into_inner),
1058 response_topic: publish.response_topic.map(Property::into_inner),
1059 correlation_data: publish.correlation_data.map(Property::into_inner),
1060 subscription_identifiers: publish
1061 .subscription_identifiers
1062 .into_iter()
1063 .map(Property::into_inner)
1064 .collect(),
1065 content_type: publish.content_type.map(Property::into_inner),
1066 message: publish.message,
1067 };
1068
1069 match publish.identified_qos {
1070 IdentifiedQoS::AtMostOnce => {
1071 debug!("received QoS 0 publication");
1072
1073 Event::Publish(publish)
1074 }
1075 IdentifiedQoS::AtLeastOnce(pid) => {
1076 debug!("received QoS 1 publication with packet identifier {}", pid);
1077
1078 // We could disconnect here using ReasonCode::ReceiveMaximumExceeded, but incoming QoS 1 publications
1079 // don't require resources outside of this scope which means we can just accept these packets.
1080
1081 let puback = PubackPacket::new(pid, ReasonCode::Success);
1082
1083 debug!("sending PUBACK packet");
1084
1085 // Don't check whether length exceeds servers maximum packet size because we don't
1086 // add properties to PUBACK packets -> length is always minimal at 6 bytes.
1087 // The server really shouldn't reject this.
1088 self.raw.send(&puback).await?;
1089 self.raw.flush().await?;
1090
1091 Event::Publish(publish)
1092 }
1093 IdentifiedQoS::ExactlyOnce(pid) => {
1094 debug!("received QoS 2 publication with packet identifier {}", pid);
1095
1096 let event = match self.session.spublish_flight_state(pid) {
1097 Some(SPublishFlightState::AwaitingPubrel) => Event::Duplicate,
1098 None if self.session.spublish_remaining_capacity() > 0 => {
1099 // Safety: `spublish_remaining_capacity()` > 0 confirms that there is space.
1100 unsafe { self.session.await_pubrel(pid) };
1101 Event::Publish(publish)
1102 }
1103 None => {
1104 error!("server exceeded receive maximum");
1105 self.raw
1106 .close_with(Some(ReasonCode::ReceiveMaximumExceeded));
1107 return Err(MqttError::Server);
1108 }
1109 };
1110
1111 let pubrec = PubrecPacket::new(pid, ReasonCode::Success);
1112
1113 debug!("sending PUBREC packet");
1114
1115 // Don't check whether length exceeds servers maximum packet size because we don't
1116 // add properties to PUBREC packets -> length is always minimal at 6 bytes.
1117 // The server really shouldn't reject this.
1118 self.raw.send(&pubrec).await?;
1119 self.raw.flush().await?;
1120
1121 event
1122 }
1123 }
1124 }
1125 PacketType::Puback => {
1126 let puback = self.raw.recv_body::<PubackPacket>(&header).await?;
1127 let pid = puback.packet_identifier;
1128 let reason_code = puback.reason_code;
1129
1130 match self.session.remove_cpublish(pid) {
1131 Some(CPublishFlightState::AwaitingPuback) if reason_code.is_success() => {
1132 debug!("publication with packet identifier {} complete", pid);
1133
1134 Event::PublishAcknowledged(Puback {
1135 packet_identifier: pid,
1136 reason_code,
1137 })
1138 }
1139 Some(CPublishFlightState::AwaitingPuback) => {
1140 debug!("publication with packet identifier {} aborted", pid);
1141
1142 Event::PublishRejected(Pubrej {
1143 packet_identifier: pid,
1144 reason_code,
1145 })
1146 }
1147 Some(
1148 s @ CPublishFlightState::AwaitingPubrec
1149 | s @ CPublishFlightState::AwaitingPubcomp,
1150 ) => {
1151 warn!("packet identifier {} in PUBACK is actually {:?}", pid, s);
1152
1153 // Readd this packet identifier to the session so that it can be republished
1154 // after reconnecting.
1155
1156 // Safety: Session::remove_cpublish returning Some and therefore successfully
1157 // removing a cpublish frees space to add a new in flight entry.
1158 unsafe { self.session.r#await(pid, s) };
1159
1160 error!("received mismatched PUBACK");
1161 self.raw.close_with(Some(ReasonCode::ProtocolError));
1162 return Err(MqttError::Server);
1163 }
1164 None => {
1165 debug!("packet identifier {} in PUBACK not in use", pid);
1166 Event::Ignored
1167 }
1168 }
1169 }
1170 PacketType::Pubrec => {
1171 let pubrec = self.raw.recv_body::<PubrecPacket>(&header).await?;
1172 let pid = pubrec.packet_identifier;
1173 let reason_code = pubrec.reason_code;
1174
1175 match self.session.remove_cpublish(pid) {
1176 Some(CPublishFlightState::AwaitingPubrec) if reason_code.is_success() => {
1177 // Safety: Session::remove_cpublish returning Some and therefore successfully
1178 // removing a cpublish frees space to add a new in flight entry.
1179 unsafe { self.session.await_pubcomp(pid) };
1180
1181 let pubrel = PubrelPacket::new(pid, ReasonCode::Success);
1182
1183 debug!("sending PUBREL packet");
1184
1185 // Don't check whether length exceeds servers maximum packet size because we don't
1186 // add properties to PUBREL packets -> length is always minimal at 6 bytes.
1187 // The server really shouldn't reject this.
1188 self.raw.send(&pubrel).await?;
1189 self.raw.flush().await?;
1190
1191 Event::PublishReceived(Puback {
1192 packet_identifier: pid,
1193 reason_code,
1194 })
1195 }
1196 Some(CPublishFlightState::AwaitingPubrec) => {
1197 // After receiving an erroneous PUBREC, we have to treat any subsequent PUBLISH packet
1198 // with the same packet identifier as a new message. This is achieved by already having
1199 // removed the packet identifier's in flight entry.
1200
1201 debug!("publication with packet identifier {} aborted", pid);
1202
1203 Event::PublishRejected(Pubrej {
1204 packet_identifier: pid,
1205 reason_code,
1206 })
1207 }
1208 Some(
1209 s @ CPublishFlightState::AwaitingPuback
1210 | s @ CPublishFlightState::AwaitingPubcomp,
1211 ) => {
1212 warn!("packet identifier {} in PUBREC is actually {:?}", pid, s);
1213
1214 // Readd this packet identifier to the session so that it can be republished
1215 // after reconnecting.
1216
1217 // Safety: Session::remove_cpublish returning Some and therefore successfully
1218 // removing a cpublish frees space to add a new in flight entry.
1219 unsafe { self.session.r#await(pid, s) };
1220
1221 error!("received mismatched PUBREC");
1222 self.raw.close_with(Some(ReasonCode::ProtocolError));
1223 return Err(MqttError::Server);
1224 }
1225 None => {
1226 debug!("packet identifier {} in PUBREC not in use", pid);
1227
1228 let pubrel = PubrelPacket::new(pid, ReasonCode::PacketIdentifierNotFound);
1229
1230 debug!("sending PUBREL packet");
1231
1232 // Don't check whether length exceeds servers maximum packet size because we don't
1233 // add properties to PUBREL packets -> length is always minimal at 6 bytes.
1234 // The server really shouldn't reject this.
1235 self.raw.send(&pubrel).await?;
1236 self.raw.flush().await?;
1237
1238 Event::Ignored
1239 }
1240 }
1241 }
1242 PacketType::Pubrel => {
1243 let pubrel = self.raw.recv_body::<PubrelPacket>(&header).await?;
1244 let pid = pubrel.packet_identifier;
1245 let reason_code = pubrel.reason_code;
1246
1247 match self.session.remove_spublish(pid) {
1248 Some(SPublishFlightState::AwaitingPubrel) if reason_code.is_success() => {
1249 let pubcomp = PubcompPacket::new(pid, ReasonCode::Success);
1250
1251 debug!("sending PUBCOMP packet");
1252
1253 // Don't check whether length exceeds servers maximum packet size because we don't
1254 // add properties to PUBCOMP packets -> length is always minimal at 6 bytes.
1255 // The server really shouldn't reject this.
1256 self.raw.send(&pubcomp).await?;
1257 self.raw.flush().await?;
1258
1259 Event::PublishReleased(Puback {
1260 packet_identifier: pid,
1261 reason_code,
1262 })
1263 }
1264 Some(SPublishFlightState::AwaitingPubrel) => {
1265 debug!("publication with packet identifier {} aborted", pid);
1266
1267 Event::PublishRejected(Pubrej {
1268 packet_identifier: pid,
1269 reason_code,
1270 })
1271 }
1272 None => {
1273 debug!("packet identifier {} in PUBREL not in use", pid);
1274
1275 let pubcomp = PubcompPacket::new(pid, ReasonCode::PacketIdentifierNotFound);
1276
1277 debug!("sending PUBCOMP packet");
1278
1279 // Don't check whether length exceeds servers maximum packet size because we don't
1280 // add properties to PUBCOMP packets -> length is always minimal at 6 bytes.
1281 // The server really shouldn't reject this.
1282 self.raw.send(&pubcomp).await?;
1283 self.raw.flush().await?;
1284
1285 Event::Ignored
1286 }
1287 }
1288 }
1289 PacketType::Pubcomp => {
1290 let pubcomp = self.raw.recv_body::<PubcompPacket>(&header).await?;
1291 let pid = pubcomp.packet_identifier;
1292 let reason_code = pubcomp.reason_code;
1293
1294 match self.session.remove_cpublish(pid) {
1295 Some(CPublishFlightState::AwaitingPubcomp) if reason_code.is_success() => {
1296 debug!("publication with packet identifier {} complete", pid);
1297
1298 Event::PublishComplete(Puback {
1299 packet_identifier: pid,
1300 reason_code: pubcomp.reason_code,
1301 })
1302 }
1303 Some(CPublishFlightState::AwaitingPubcomp) => {
1304 debug!("publication with packet identifier {} aborted", pid);
1305
1306 Event::PublishRejected(Pubrej {
1307 packet_identifier: pid,
1308 reason_code,
1309 })
1310 }
1311 Some(
1312 s @ CPublishFlightState::AwaitingPuback
1313 | s @ CPublishFlightState::AwaitingPubrec,
1314 ) => {
1315 warn!("packet identifier {} in PUBCOMP is actually {:?}", pid, s);
1316
1317 // Readd this packet identifier to the session so that it can be republished
1318 // after reconnecting.
1319
1320 // Safety: Session::remove_cpublish returning Some and therefore successfully
1321 // removing a cpublish frees space to add a new in flight entry.
1322 unsafe { self.session.r#await(pid, s) };
1323
1324 error!("received mismatched PUBCOMP");
1325 self.raw.close_with(Some(ReasonCode::ProtocolError));
1326 return Err(MqttError::Server);
1327 }
1328 None => {
1329 debug!("packet identifier {} in PUBCOMP not in use", pid);
1330 Event::Ignored
1331 }
1332 }
1333 }
1334 PacketType::Disconnect => {
1335 let disconnect = self.raw.recv_body::<DisconnectPacket>(&header).await?;
1336
1337 return Err(MqttError::Disconnect {
1338 reason: disconnect.reason_code,
1339 reason_string: disconnect.reason_string.map(Property::into_inner),
1340 server_reference: disconnect.server_reference.map(Property::into_inner),
1341 });
1342 }
1343 t @ (PacketType::Connect
1344 | PacketType::Subscribe
1345 | PacketType::Unsubscribe
1346 | PacketType::Pingreq) => {
1347 error!(
1348 "received a packet that the server is not allowed to send: {:?}",
1349 t
1350 );
1351
1352 self.raw.close_with(Some(ReasonCode::ProtocolError));
1353 return Err(MqttError::Server);
1354 }
1355 PacketType::Connack => {
1356 error!("received unexpected CONNACK packet");
1357
1358 self.raw.close_with(Some(ReasonCode::ProtocolError));
1359 return Err(MqttError::Server);
1360 }
1361 PacketType::Auth => {
1362 error!("received unexpected AUTH packet");
1363
1364 // Receiving a AUTH packet is currently always a protocol error because we never send
1365 // an Authentication Method property in the CONNECT packet.
1366 // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901217>
1367 self.raw.close_with(Some(ReasonCode::ProtocolError));
1368 return Err(MqttError::AuthPacketReceived);
1369 }
1370 };
1371
1372 Ok(event)
1373 }
1374}