Skip to main content

mountain_mqtt/
client_state.rs

1use core::fmt::{Display, Formatter};
2
3use heapless::Vec;
4
5use crate::{
6    data::{
7        packet_identifier::{PacketIdentifier, PublishPacketIdentifier},
8        property::{ConnackProperty, Property, PublishProperty},
9        quality_of_service::QualityOfService,
10        reason_code::{
11            ConnectReasonCode, PublishReasonCode, SubscribeReasonCode, UnsubscribeReasonCode,
12        },
13    },
14    error::{PacketReadError, PacketWriteError},
15    packets::{
16        connect::Connect,
17        disconnect::Disconnect,
18        packet_generic::PacketGeneric,
19        pingreq::Pingreq,
20        puback::Puback,
21        publish::Publish,
22        subscribe::{Subscribe, SubscriptionRequest},
23        unsubscribe::Unsubscribe,
24    },
25};
26
27/// [ClientState] error
28#[derive(Debug, PartialEq, Clone, Copy)]
29pub enum ClientStateError {
30    PacketWrite(PacketWriteError),
31    PacketRead(PacketReadError),
32    NotIdle,
33    AuthNotSupported,
34    Qos2NotSupported,
35    ReceivedQos2PublishNotSupported,
36    ClientIsWaitingForResponse,
37    NotConnected,
38    ReceiveWhenNotConnectedOrConnecting,
39    UnexpectedPuback,
40    UnexpectedPubackPacketIdentifier,
41    UnexpectedSuback,
42    UnexpectedSubackPacketIdentifier,
43    UnexpectedUnsuback,
44    UnexpectedUnsubackPacketIdentifier,
45    UnexpectedPingresp,
46    Disconnect,
47    ServerOnlyMessageReceived,
48    ReceivedPacketOtherThanConnackOrAuthWhenConnecting,
49    ReceivedConnackWhenNotConnecting,
50    UnexpectedSessionPresentForCleanStart,
51    Connect(ConnectReasonCode),
52    Subscribe(SubscribeReasonCode),
53    Publish(PublishReasonCode),
54    Unsubscribe(UnsubscribeReasonCode),
55}
56
57#[cfg(feature = "defmt")]
58impl defmt::Format for ClientStateError {
59    fn format(&self, f: defmt::Formatter) {
60        match self {
61            Self::PacketWrite(e) => defmt::write!(f, "PacketWrite({})", e),
62            Self::PacketRead(e) => defmt::write!(f, "PacketRead({})", e),
63            Self::NotIdle => defmt::write!(f, "NotIdle"),
64            Self::AuthNotSupported => defmt::write!(f, "AuthNotSupported"),
65            Self::Qos2NotSupported => defmt::write!(f, "Qos2NotSupported"),
66            Self::ReceivedQos2PublishNotSupported => {
67                defmt::write!(f, "ReceivedQos2PublishNotSupported")
68            }
69            Self::ClientIsWaitingForResponse => defmt::write!(f, "ClientIsWaitingForResponse"),
70            Self::NotConnected => defmt::write!(f, "NotConnected"),
71            Self::ReceiveWhenNotConnectedOrConnecting => {
72                defmt::write!(f, "ReceiveWhenNotConnectedOrConnecting")
73            }
74            Self::UnexpectedPuback => defmt::write!(f, "UnexpectedPuback"),
75            Self::UnexpectedPubackPacketIdentifier => {
76                defmt::write!(f, "UnexpectedPubackPacketIdentifier")
77            }
78            Self::UnexpectedSuback => defmt::write!(f, "UnexpectedSuback"),
79            Self::UnexpectedSubackPacketIdentifier => {
80                defmt::write!(f, "UnexpectedSubackPacketIdentifier")
81            }
82            Self::UnexpectedUnsuback => defmt::write!(f, "UnexpectedUnsuback"),
83            Self::UnexpectedUnsubackPacketIdentifier => {
84                defmt::write!(f, "UnexpectedUnsubackPacketIdentifier")
85            }
86            Self::UnexpectedPingresp => defmt::write!(f, "UnexpectedPingresp"),
87            Self::Disconnect => defmt::write!(f, "Disconnect"),
88            Self::ServerOnlyMessageReceived => defmt::write!(f, "ServerOnlyMessageReceived"),
89            Self::ReceivedPacketOtherThanConnackOrAuthWhenConnecting => {
90                defmt::write!(f, "ReceivedPacketOtherThanConnackOrAuthWhenConnecting")
91            }
92            Self::ReceivedConnackWhenNotConnecting => {
93                defmt::write!(f, "ReceivedConnackWhenNotConnecting")
94            }
95            Self::UnexpectedSessionPresentForCleanStart => {
96                defmt::write!(f, "UnexpectedSessionPresentForCleanStart")
97            }
98            Self::Connect(r) => defmt::write!(f, "Connect({})", r),
99            Self::Subscribe(r) => defmt::write!(f, "Subscribe({})", r),
100            Self::Publish(r) => defmt::write!(f, "Publish({})", r),
101            Self::Unsubscribe(r) => defmt::write!(f, "Unsubscribe({})", r),
102        }
103    }
104}
105
106impl Display for ClientStateError {
107    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
108        match self {
109            Self::PacketWrite(e) => write!(f, "PacketWrite({})", e),
110            Self::PacketRead(e) => write!(f, "PacketRead({})", e),
111            Self::NotIdle => write!(f, "NotIdle"),
112            Self::AuthNotSupported => write!(f, "AuthNotSupported"),
113            Self::Qos2NotSupported => write!(f, "Qos2NotSupported"),
114            Self::ReceivedQos2PublishNotSupported => write!(f, "ReceivedQos2PublishNotSupported"),
115            Self::ClientIsWaitingForResponse => write!(f, "ClientIsWaitingForResponse"),
116            Self::NotConnected => write!(f, "NotConnected"),
117            Self::ReceiveWhenNotConnectedOrConnecting => {
118                write!(f, "ReceiveWhenNotConnectedOrConnecting")
119            }
120            Self::UnexpectedPuback => write!(f, "UnexpectedPuback"),
121            Self::UnexpectedPubackPacketIdentifier => write!(f, "UnexpectedPubackPacketIdentifier"),
122            Self::UnexpectedSuback => write!(f, "UnexpectedSuback"),
123            Self::UnexpectedSubackPacketIdentifier => write!(f, "UnexpectedSubackPacketIdentifier"),
124            Self::UnexpectedUnsuback => write!(f, "UnexpectedUnsuback"),
125            Self::UnexpectedUnsubackPacketIdentifier => {
126                write!(f, "UnexpectedUnsubackPacketIdentifier")
127            }
128            Self::UnexpectedPingresp => write!(f, "UnexpectedPingresp"),
129            Self::Disconnect => write!(f, "Disconnect"),
130            Self::ServerOnlyMessageReceived => write!(f, "ServerOnlyMessageReceived"),
131            Self::Connect(e) => write!(f, "Connect({})", e),
132            Self::Subscribe(e) => write!(f, "Subscribe({})", e),
133            Self::Publish(e) => write!(f, "Publish({})", e),
134            Self::Unsubscribe(e) => write!(f, "Unsubscribe({})", e),
135            // Self::Overflow => write!(f, "Overflow"),
136            Self::ReceivedPacketOtherThanConnackOrAuthWhenConnecting => {
137                write!(f, "ReceivedPacketOtherThanConnackWhenConnecting")
138            }
139            Self::ReceivedConnackWhenNotConnecting => write!(f, "ReceivedConnackWhenNotConnecting"),
140            Self::UnexpectedSessionPresentForCleanStart => {
141                write!(f, "UnexpectedSessionPresentForCleanStart")
142            }
143        }
144    }
145}
146
147pub enum ClientStateReceiveEvent<'a, 'b, const P: usize> {
148    /// Client received an acknowledgement/response for a previous message sent
149    /// to the server (e.g. Connack, Puback, Suback, Unsuback, Pingresp)
150    /// These are all handled internally by the state, so do not need an external
151    /// response
152    Ack,
153
154    /// A published message was received
155    Publish { publish: Publish<'a, P> },
156
157    /// A published message was received, and it needs to be acknowledged by sending provided puback to the server
158    PublishAndPuback {
159        publish: Publish<'a, P>,
160        puback: Puback<'b, P>,
161    },
162
163    /// A subscription was granted but was at lower qos than the maximum requested
164    /// This may or may not require action depending on client requirements -
165    /// it means that the given subscription will receive published messages at
166    /// only the granted qos - if the requested maximum qos was absolutely required
167    /// then the client could respond by showing an error to the user stating the
168    /// server is incompatible, or possibly trying to unsubscribe and resubscribe,
169    /// assuming this is expected to make any difference with the server(s) in use.
170    SubscriptionGrantedBelowMaximumQos {
171        granted_qos: QualityOfService,
172        maximum_qos: QualityOfService,
173    },
174
175    /// A published message was received at the server, but had no matching subscribers and
176    /// so did not reach any receivers
177    /// This may or may not require action depending on client requirements / expectations
178    /// E.g. if it was expected there would be subscribers, the client could try resending
179    /// the message later
180    PublishedMessageHadNoMatchingSubscribers,
181
182    // Server processed an unsubscribe request, but no such subscription existed on the server,
183    // so nothing changed.
184    /// This may or may not require action depending on client requirements / expectations
185    /// E.g. if it was expected there would be a subscription, the client could produce
186    /// an error, and the user of the client might try reconnecting to the server to set
187    /// up subscriptions again.
188    NoSubscriptionExisted,
189
190    /// A [Disconnect] packet was received, it should contain a reason for our disconnection
191    Disconnect { disconnect: Disconnect<'a, P> },
192}
193
194impl From<PacketWriteError> for ClientStateError {
195    fn from(value: PacketWriteError) -> Self {
196        ClientStateError::PacketWrite(value)
197    }
198}
199
200impl From<PacketReadError> for ClientStateError {
201    fn from(value: PacketReadError) -> Self {
202        ClientStateError::PacketRead(value)
203    }
204}
205
206/// Tracks the state of a simple MQTT client, allowing for tracking acknowledgements, and producing the necessary packets for basic actions
207#[allow(async_fn_in_trait)]
208pub trait ClientState {
209    /// Returns true when the state indicates we require a response to a sent
210    /// packet - in this case you must receive packets and provide them to
211    /// `receive` until `waiting_for_responses` returns false.
212    /// The exception is that it is possible to call `disconnect`,
213    /// `send_ping` or `send_message` (with a quality of service of 0),
214    /// while `waiting_for_responses` is true.
215    fn waiting_for_responses(&self) -> bool;
216
217    /// Update state based on a packet used to connect to server
218    /// Call this after connect packet has been successfully sent.
219    fn connect<const P: usize, const W: usize>(
220        &mut self,
221        connect: &Connect<'_, P, W>,
222    ) -> Result<(), ClientStateError>;
223
224    /// Produce a packet to disconnect from server, update state
225    fn disconnect<'b>(&mut self) -> Result<Disconnect<'b, 0>, ClientStateError>;
226
227    /// Produce a packet to ping the server, update state
228    fn send_ping(&mut self) -> Result<Pingreq, ClientStateError>;
229
230    /// Receive a packet
231    /// This updates the client state, and if anything that might require
232    /// action by the caller occurs, a [ClientStateReceiveEvent] is returned.
233    /// Errors indicate an invalid packet was received, message_target errored,
234    /// or the received packet was unexpected based on our state
235    fn receive<'a, 'b, const P: usize, const W: usize, const S: usize>(
236        &mut self,
237        packet: PacketGeneric<'a, P, W, S>,
238    ) -> Result<ClientStateReceiveEvent<'a, 'b, P>, ClientStateError>;
239
240    /// Produce a packet to subscribe to a topic by name, update state
241    fn subscribe<'b>(
242        &mut self,
243        topic_name: &'b str,
244        maximum_qos: QualityOfService,
245    ) -> Result<Subscribe<'b, 0, 0>, ClientStateError>;
246
247    /// Produce a packet to unsubscribe from a topic by name, update state
248    fn unsubscribe<'b>(
249        &mut self,
250        topic_name: &'b str,
251    ) -> Result<Unsubscribe<'b, 0, 0>, ClientStateError>;
252
253    /// Produce a packet to publish to a given topic, update state, with no properties
254    fn publish<'b>(
255        &mut self,
256        topic_name: &'b str,
257        payload: &'b [u8],
258        qos: QualityOfService,
259        retain: bool,
260    ) -> Result<Publish<'b, 0>, ClientStateError> {
261        self.publish_with_properties(topic_name, payload, qos, retain, Vec::new())
262    }
263
264    /// Produce a packet to publish to a given topic, update state, with properties
265    fn publish_with_properties<'b, const P: usize>(
266        &mut self,
267        topic_name: &'b str,
268        payload: &'b [u8],
269        qos: QualityOfService,
270        retain: bool,
271        properties: Vec<PublishProperty<'b>, P>,
272    ) -> Result<Publish<'b, P>, ClientStateError>;
273
274    /// Move to errored state, no further operations are possible
275    /// This must be called if the user of the client state cannot successfully send
276    /// a packet produced by this [ClientState]
277    fn error(&mut self);
278}
279
280#[derive(PartialEq)]
281pub enum ClientStateNoQueue {
282    Idle,
283    Connecting(RequestedConnectionInfo),
284    Connected(ConnectionState),
285    Errored,
286    Disconnected,
287}
288
289#[derive(PartialEq)]
290pub struct RequestedConnectionInfo {
291    clean_start: bool,
292    keep_alive: u16,
293}
294
295#[derive(PartialEq)]
296pub struct ConnectionInfo {
297    pending_ping_count: u32,
298    session_present: bool,
299    keep_alive: u16,
300}
301
302#[derive(PartialEq)]
303pub struct ConnectionState {
304    info: ConnectionInfo,
305    waiting: Waiting,
306}
307
308#[derive(PartialEq)]
309enum Waiting {
310    None,
311    ForPuback {
312        id: PacketIdentifier,
313    },
314    ForSuback {
315        id: PacketIdentifier,
316        qos: QualityOfService,
317    },
318    ForUnsuback {
319        id: PacketIdentifier,
320    },
321}
322
323impl Waiting {
324    fn is_waiting(&self) -> bool {
325        match self {
326            Self::None => false,
327            Self::ForPuback { id: _ } => true,
328            Self::ForSuback { id: _, qos: _ } => true,
329            Self::ForUnsuback { id: _ } => true,
330        }
331    }
332}
333
334impl ClientStateNoQueue {
335    const PUBLISH_PACKET_IDENTIFIER: PacketIdentifier = PacketIdentifier(1);
336    const SUBSCRIBE_PACKET_IDENTIFIER: PacketIdentifier = PacketIdentifier(2);
337    const UNSUBSCRIBE_PACKET_IDENTIFIER: PacketIdentifier = PacketIdentifier(3);
338
339    pub fn new() -> Self {
340        Self::Idle
341    }
342}
343
344impl Default for ClientStateNoQueue {
345    fn default() -> Self {
346        Self::new()
347    }
348}
349
350impl ClientState for ClientStateNoQueue {
351    fn waiting_for_responses(&self) -> bool {
352        match self {
353            Self::Idle => false,
354            Self::Connecting(_) => true,
355            Self::Connected(connection_data) => connection_data.waiting.is_waiting(),
356            Self::Errored => false,
357            Self::Disconnected => false,
358        }
359    }
360
361    fn connect<const P: usize, const W: usize>(
362        &mut self,
363        connect: &Connect<'_, P, W>,
364    ) -> Result<(), ClientStateError> {
365        match self {
366            ClientStateNoQueue::Idle => {
367                *self = Self::Connecting(RequestedConnectionInfo {
368                    clean_start: connect.clean_start(),
369                    keep_alive: connect.keep_alive(),
370                });
371                Ok(())
372            }
373            _ => Err(ClientStateError::NotIdle),
374        }
375    }
376
377    fn disconnect<'b>(&mut self) -> Result<Disconnect<'b, 0>, ClientStateError> {
378        match self {
379            ClientStateNoQueue::Connected(_d) => {
380                *self = Self::Disconnected;
381                Ok(Disconnect::default())
382            }
383            _ => Err(ClientStateError::NotConnected),
384        }
385    }
386
387    fn publish_with_properties<'b, const P: usize>(
388        &mut self,
389        topic_name: &'b str,
390        payload: &'b [u8],
391        qos: QualityOfService,
392        retain: bool,
393        properties: Vec<PublishProperty<'b>, P>,
394    ) -> Result<Publish<'b, P>, ClientStateError> {
395        match self {
396            ClientStateNoQueue::Connected(ConnectionState { info: _, waiting }) => {
397                let publish_packet_identifier = match qos {
398                    QualityOfService::Qos0 => Ok(PublishPacketIdentifier::None),
399                    QualityOfService::Qos1 if waiting.is_waiting() => {
400                        Err(ClientStateError::ClientIsWaitingForResponse)
401                    }
402                    QualityOfService::Qos1 => Ok(PublishPacketIdentifier::Qos1(
403                        Self::PUBLISH_PACKET_IDENTIFIER,
404                    )),
405                    QualityOfService::Qos2 => Err(ClientStateError::Qos2NotSupported),
406                }?;
407
408                let publish = Publish::new(
409                    false,
410                    retain,
411                    topic_name,
412                    publish_packet_identifier,
413                    payload,
414                    properties,
415                );
416
417                if qos == QualityOfService::Qos1 {
418                    *waiting = Waiting::ForPuback {
419                        id: Self::PUBLISH_PACKET_IDENTIFIER,
420                    };
421                }
422
423                Ok(publish)
424            }
425            _ => Err(ClientStateError::NotConnected),
426        }
427    }
428
429    fn subscribe<'b>(
430        &mut self,
431        topic_name: &'b str,
432        maximum_qos: QualityOfService,
433    ) -> Result<Subscribe<'b, 0, 0>, ClientStateError> {
434        match self {
435            ClientStateNoQueue::Connected(ConnectionState { info: _, waiting }) => {
436                if waiting.is_waiting() {
437                    Err(ClientStateError::ClientIsWaitingForResponse)
438                } else if maximum_qos == QualityOfService::Qos2 {
439                    Err(ClientStateError::Qos2NotSupported)
440                } else {
441                    let first_request = SubscriptionRequest::new(topic_name, maximum_qos);
442                    let subscribe: Subscribe<'_, 0, 0> = Subscribe::new(
443                        Self::SUBSCRIBE_PACKET_IDENTIFIER,
444                        first_request,
445                        Vec::new(),
446                        Vec::new(),
447                    );
448
449                    *waiting = Waiting::ForSuback {
450                        id: Self::SUBSCRIBE_PACKET_IDENTIFIER,
451                        qos: maximum_qos,
452                    };
453
454                    Ok(subscribe)
455                }
456            }
457            _ => Err(ClientStateError::NotConnected),
458        }
459    }
460
461    fn unsubscribe<'b>(
462        &mut self,
463        topic_name: &'b str,
464    ) -> Result<Unsubscribe<'b, 0, 0>, ClientStateError> {
465        match self {
466            ClientStateNoQueue::Connected(ConnectionState { info: _, waiting }) => {
467                if waiting.is_waiting() {
468                    Err(ClientStateError::ClientIsWaitingForResponse)
469                } else {
470                    let unsubscribe: Unsubscribe<'_, 0, 0> = Unsubscribe::new(
471                        Self::UNSUBSCRIBE_PACKET_IDENTIFIER,
472                        topic_name,
473                        Vec::new(),
474                        Vec::new(),
475                    );
476
477                    *waiting = Waiting::ForUnsuback {
478                        id: Self::UNSUBSCRIBE_PACKET_IDENTIFIER,
479                    };
480
481                    Ok(unsubscribe)
482                }
483            }
484            _ => Err(ClientStateError::NotConnected),
485        }
486    }
487
488    fn send_ping(&mut self) -> Result<Pingreq, ClientStateError> {
489        match self {
490            ClientStateNoQueue::Connected(ConnectionState { info, waiting: _ }) => {
491                info.pending_ping_count += 1;
492                Ok(Pingreq::default())
493            }
494            _ => Err(ClientStateError::NotConnected),
495        }
496    }
497
498    fn receive<'a, 'b, const P: usize, const W: usize, const S: usize>(
499        &mut self,
500        packet: PacketGeneric<'a, P, W, S>,
501    ) -> Result<ClientStateReceiveEvent<'a, 'b, P>, ClientStateError> {
502        match self {
503            // If we are connecting, we only expect a Connack packet
504            // (server cannot disconnect before Connack [MQTT-3.14.0-1])
505            // or an Auth packet [MQTT-3.2.0-1]
506            ClientStateNoQueue::Connecting(RequestedConnectionInfo {
507                clean_start,
508                keep_alive,
509            }) => match packet {
510                PacketGeneric::Connack(connack) => match connack.reason_code() {
511                    ConnectReasonCode::Success => {
512                        let session_present = connack.session_present();
513
514                        // If there's a session, but we requested a clean start, this is an error
515                        if session_present && *clean_start {
516                            return Err(ClientStateError::UnexpectedSessionPresentForCleanStart);
517                        }
518
519                        // Keep alive is the one we requested, unless server returns a new one as a property
520                        let mut actual_keep_alive = *keep_alive;
521                        for p in connack.properties().iter() {
522                            if let ConnackProperty::ServerKeepAlive(server_keep_alive) = p {
523                                actual_keep_alive = server_keep_alive.value();
524                            }
525                        }
526
527                        let info = ConnectionInfo {
528                            pending_ping_count: 0,
529                            session_present,
530                            keep_alive: actual_keep_alive,
531                        };
532
533                        *self = Self::Connected(ConnectionState {
534                            info,
535                            waiting: Waiting::None,
536                        });
537
538                        Ok(ClientStateReceiveEvent::Ack)
539                    }
540                    reason_code => Err(ClientStateError::Connect(*reason_code)),
541                },
542                PacketGeneric::Auth(_) => Err(ClientStateError::AuthNotSupported),
543                _ => Err(ClientStateError::ReceivedPacketOtherThanConnackOrAuthWhenConnecting),
544            },
545
546            // If we are connected, we handle all client packets other than Connack
547            ClientStateNoQueue::Connected(ConnectionState { info, waiting }) => match packet {
548                PacketGeneric::Publish(publish) => match publish.publish_packet_identifier() {
549                    PublishPacketIdentifier::None => {
550                        Ok(ClientStateReceiveEvent::Publish { publish })
551                    }
552                    PublishPacketIdentifier::Qos1(packet_identifier) => {
553                        let puback =
554                            Puback::new(*packet_identifier, PublishReasonCode::Success, Vec::new());
555                        Ok(ClientStateReceiveEvent::PublishAndPuback { publish, puback })
556                    }
557                    PublishPacketIdentifier::Qos2(_) => {
558                        Err(ClientStateError::ReceivedQos2PublishNotSupported)
559                    }
560                },
561
562                PacketGeneric::Puback(puback) => {
563                    let ack_id = puback.packet_identifier();
564                    match waiting {
565                        Waiting::ForPuback { id } if id == ack_id => {
566                            *waiting = Waiting::None;
567
568                            let reason_code = puback.reason_code();
569                            if reason_code.is_error() {
570                                Err(ClientStateError::Publish(*reason_code))
571                            } else if reason_code == &PublishReasonCode::NoMatchingSubscribers {
572                                Ok(ClientStateReceiveEvent::PublishedMessageHadNoMatchingSubscribers)
573                            } else {
574                                Ok(ClientStateReceiveEvent::Ack)
575                            }
576                        }
577                        Waiting::ForPuback { id: _ } => {
578                            Err(ClientStateError::UnexpectedPubackPacketIdentifier)
579                        }
580                        _ => Err(ClientStateError::UnexpectedPuback),
581                    }
582                }
583
584                PacketGeneric::Suback(suback) => {
585                    let ack_id = suback.packet_identifier();
586
587                    match waiting {
588                        Waiting::ForSuback { id, qos } if id == ack_id => {
589                            let maximum_qos = *qos;
590                            *waiting = Waiting::None;
591
592                            let reason_code = suback.first_reason_code();
593                            let granted_qos = match reason_code {
594                                SubscribeReasonCode::Success => QualityOfService::Qos0,
595                                SubscribeReasonCode::GrantedQos1 => QualityOfService::Qos1,
596                                SubscribeReasonCode::GrantedQos2 => QualityOfService::Qos2,
597                                err => return Err(ClientStateError::Subscribe(*err)),
598                            };
599
600                            if granted_qos != maximum_qos {
601                                Ok(
602                                    ClientStateReceiveEvent::SubscriptionGrantedBelowMaximumQos {
603                                        granted_qos,
604                                        maximum_qos,
605                                    },
606                                )
607                            } else {
608                                Ok(ClientStateReceiveEvent::Ack)
609                            }
610                        }
611                        Waiting::ForSuback { id: _, qos: _ } => {
612                            Err(ClientStateError::UnexpectedSubackPacketIdentifier)
613                        }
614                        _ => Err(ClientStateError::UnexpectedSuback),
615                    }
616                }
617                PacketGeneric::Unsuback(unsuback) => {
618                    let ack_id = unsuback.packet_identifier();
619
620                    match waiting {
621                        Waiting::ForUnsuback { id } if id == ack_id => {
622                            *waiting = Waiting::None;
623
624                            let reason_code = unsuback.first_reason_code();
625                            if reason_code.is_error() {
626                                Err(ClientStateError::Unsubscribe(*reason_code))
627                            } else if reason_code == &UnsubscribeReasonCode::NoSubscriptionExisted {
628                                Ok(ClientStateReceiveEvent::NoSubscriptionExisted)
629                            } else {
630                                Ok(ClientStateReceiveEvent::Ack)
631                            }
632                        }
633                        Waiting::ForUnsuback { id: _ } => {
634                            Err(ClientStateError::UnexpectedUnsubackPacketIdentifier)
635                        }
636                        _ => Err(ClientStateError::UnexpectedUnsuback),
637                    }
638                }
639                PacketGeneric::Pingresp(_pingresp) => {
640                    if info.pending_ping_count > 0 {
641                        info.pending_ping_count -= 1;
642                        Ok(ClientStateReceiveEvent::Ack)
643                    } else {
644                        Err(ClientStateError::UnexpectedPingresp)
645                    }
646                }
647                PacketGeneric::Disconnect(disconnect) => {
648                    Ok(ClientStateReceiveEvent::Disconnect { disconnect })
649                }
650                PacketGeneric::Connack(_) => {
651                    Err(ClientStateError::ReceivedConnackWhenNotConnecting)
652                }
653                PacketGeneric::Auth(_auth) => Err(ClientStateError::AuthNotSupported),
654                PacketGeneric::Connect(_)
655                | PacketGeneric::Pubrec(_)
656                | PacketGeneric::Pubrel(_)
657                | PacketGeneric::Pubcomp(_)
658                | PacketGeneric::Subscribe(_)
659                | PacketGeneric::Unsubscribe(_)
660                | PacketGeneric::Pingreq(_) => Err(ClientStateError::ServerOnlyMessageReceived),
661            },
662            _ => Err(ClientStateError::ReceiveWhenNotConnectedOrConnecting),
663        }
664    }
665
666    fn error(&mut self) {
667        *self = Self::Errored;
668    }
669}