1use core::fmt::{Display, Formatter};
2
3use heapless::Vec;
4
5use crate::{
6 data::{
7 packet_identifier::{PacketIdentifier, PublishPacketIdentifier},
8 property::{ConnackProperty, Property, PublishProperty},
9 quality_of_service::QualityOfService,
10 reason_code::{
11 ConnectReasonCode, PublishReasonCode, SubscribeReasonCode, UnsubscribeReasonCode,
12 },
13 },
14 error::{PacketReadError, PacketWriteError},
15 packets::{
16 connect::Connect,
17 disconnect::Disconnect,
18 packet_generic::PacketGeneric,
19 pingreq::Pingreq,
20 puback::Puback,
21 publish::Publish,
22 subscribe::{Subscribe, SubscriptionRequest},
23 unsubscribe::Unsubscribe,
24 },
25};
26
27#[derive(Debug, PartialEq, Clone, Copy)]
29pub enum ClientStateError {
30 PacketWrite(PacketWriteError),
31 PacketRead(PacketReadError),
32 NotIdle,
33 AuthNotSupported,
34 Qos2NotSupported,
35 ReceivedQos2PublishNotSupported,
36 ClientIsWaitingForResponse,
37 NotConnected,
38 ReceiveWhenNotConnectedOrConnecting,
39 UnexpectedPuback,
40 UnexpectedPubackPacketIdentifier,
41 UnexpectedSuback,
42 UnexpectedSubackPacketIdentifier,
43 UnexpectedUnsuback,
44 UnexpectedUnsubackPacketIdentifier,
45 UnexpectedPingresp,
46 Disconnect,
47 ServerOnlyMessageReceived,
48 ReceivedPacketOtherThanConnackOrAuthWhenConnecting,
49 ReceivedConnackWhenNotConnecting,
50 UnexpectedSessionPresentForCleanStart,
51 Connect(ConnectReasonCode),
52 Subscribe(SubscribeReasonCode),
53 Publish(PublishReasonCode),
54 Unsubscribe(UnsubscribeReasonCode),
55}
56
57#[cfg(feature = "defmt")]
58impl defmt::Format for ClientStateError {
59 fn format(&self, f: defmt::Formatter) {
60 match self {
61 Self::PacketWrite(e) => defmt::write!(f, "PacketWrite({})", e),
62 Self::PacketRead(e) => defmt::write!(f, "PacketRead({})", e),
63 Self::NotIdle => defmt::write!(f, "NotIdle"),
64 Self::AuthNotSupported => defmt::write!(f, "AuthNotSupported"),
65 Self::Qos2NotSupported => defmt::write!(f, "Qos2NotSupported"),
66 Self::ReceivedQos2PublishNotSupported => {
67 defmt::write!(f, "ReceivedQos2PublishNotSupported")
68 }
69 Self::ClientIsWaitingForResponse => defmt::write!(f, "ClientIsWaitingForResponse"),
70 Self::NotConnected => defmt::write!(f, "NotConnected"),
71 Self::ReceiveWhenNotConnectedOrConnecting => {
72 defmt::write!(f, "ReceiveWhenNotConnectedOrConnecting")
73 }
74 Self::UnexpectedPuback => defmt::write!(f, "UnexpectedPuback"),
75 Self::UnexpectedPubackPacketIdentifier => {
76 defmt::write!(f, "UnexpectedPubackPacketIdentifier")
77 }
78 Self::UnexpectedSuback => defmt::write!(f, "UnexpectedSuback"),
79 Self::UnexpectedSubackPacketIdentifier => {
80 defmt::write!(f, "UnexpectedSubackPacketIdentifier")
81 }
82 Self::UnexpectedUnsuback => defmt::write!(f, "UnexpectedUnsuback"),
83 Self::UnexpectedUnsubackPacketIdentifier => {
84 defmt::write!(f, "UnexpectedUnsubackPacketIdentifier")
85 }
86 Self::UnexpectedPingresp => defmt::write!(f, "UnexpectedPingresp"),
87 Self::Disconnect => defmt::write!(f, "Disconnect"),
88 Self::ServerOnlyMessageReceived => defmt::write!(f, "ServerOnlyMessageReceived"),
89 Self::ReceivedPacketOtherThanConnackOrAuthWhenConnecting => {
90 defmt::write!(f, "ReceivedPacketOtherThanConnackOrAuthWhenConnecting")
91 }
92 Self::ReceivedConnackWhenNotConnecting => {
93 defmt::write!(f, "ReceivedConnackWhenNotConnecting")
94 }
95 Self::UnexpectedSessionPresentForCleanStart => {
96 defmt::write!(f, "UnexpectedSessionPresentForCleanStart")
97 }
98 Self::Connect(r) => defmt::write!(f, "Connect({})", r),
99 Self::Subscribe(r) => defmt::write!(f, "Subscribe({})", r),
100 Self::Publish(r) => defmt::write!(f, "Publish({})", r),
101 Self::Unsubscribe(r) => defmt::write!(f, "Unsubscribe({})", r),
102 }
103 }
104}
105
106impl Display for ClientStateError {
107 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
108 match self {
109 Self::PacketWrite(e) => write!(f, "PacketWrite({})", e),
110 Self::PacketRead(e) => write!(f, "PacketRead({})", e),
111 Self::NotIdle => write!(f, "NotIdle"),
112 Self::AuthNotSupported => write!(f, "AuthNotSupported"),
113 Self::Qos2NotSupported => write!(f, "Qos2NotSupported"),
114 Self::ReceivedQos2PublishNotSupported => write!(f, "ReceivedQos2PublishNotSupported"),
115 Self::ClientIsWaitingForResponse => write!(f, "ClientIsWaitingForResponse"),
116 Self::NotConnected => write!(f, "NotConnected"),
117 Self::ReceiveWhenNotConnectedOrConnecting => {
118 write!(f, "ReceiveWhenNotConnectedOrConnecting")
119 }
120 Self::UnexpectedPuback => write!(f, "UnexpectedPuback"),
121 Self::UnexpectedPubackPacketIdentifier => write!(f, "UnexpectedPubackPacketIdentifier"),
122 Self::UnexpectedSuback => write!(f, "UnexpectedSuback"),
123 Self::UnexpectedSubackPacketIdentifier => write!(f, "UnexpectedSubackPacketIdentifier"),
124 Self::UnexpectedUnsuback => write!(f, "UnexpectedUnsuback"),
125 Self::UnexpectedUnsubackPacketIdentifier => {
126 write!(f, "UnexpectedUnsubackPacketIdentifier")
127 }
128 Self::UnexpectedPingresp => write!(f, "UnexpectedPingresp"),
129 Self::Disconnect => write!(f, "Disconnect"),
130 Self::ServerOnlyMessageReceived => write!(f, "ServerOnlyMessageReceived"),
131 Self::Connect(e) => write!(f, "Connect({})", e),
132 Self::Subscribe(e) => write!(f, "Subscribe({})", e),
133 Self::Publish(e) => write!(f, "Publish({})", e),
134 Self::Unsubscribe(e) => write!(f, "Unsubscribe({})", e),
135 Self::ReceivedPacketOtherThanConnackOrAuthWhenConnecting => {
137 write!(f, "ReceivedPacketOtherThanConnackWhenConnecting")
138 }
139 Self::ReceivedConnackWhenNotConnecting => write!(f, "ReceivedConnackWhenNotConnecting"),
140 Self::UnexpectedSessionPresentForCleanStart => {
141 write!(f, "UnexpectedSessionPresentForCleanStart")
142 }
143 }
144 }
145}
146
147pub enum ClientStateReceiveEvent<'a, 'b, const P: usize> {
148 Ack,
153
154 Publish { publish: Publish<'a, P> },
156
157 PublishAndPuback {
159 publish: Publish<'a, P>,
160 puback: Puback<'b, P>,
161 },
162
163 SubscriptionGrantedBelowMaximumQos {
171 granted_qos: QualityOfService,
172 maximum_qos: QualityOfService,
173 },
174
175 PublishedMessageHadNoMatchingSubscribers,
181
182 NoSubscriptionExisted,
189
190 Disconnect { disconnect: Disconnect<'a, P> },
192}
193
194impl From<PacketWriteError> for ClientStateError {
195 fn from(value: PacketWriteError) -> Self {
196 ClientStateError::PacketWrite(value)
197 }
198}
199
200impl From<PacketReadError> for ClientStateError {
201 fn from(value: PacketReadError) -> Self {
202 ClientStateError::PacketRead(value)
203 }
204}
205
206#[allow(async_fn_in_trait)]
208pub trait ClientState {
209 fn waiting_for_responses(&self) -> bool;
216
217 fn connect<const P: usize, const W: usize>(
220 &mut self,
221 connect: &Connect<'_, P, W>,
222 ) -> Result<(), ClientStateError>;
223
224 fn disconnect<'b>(&mut self) -> Result<Disconnect<'b, 0>, ClientStateError>;
226
227 fn send_ping(&mut self) -> Result<Pingreq, ClientStateError>;
229
230 fn receive<'a, 'b, const P: usize, const W: usize, const S: usize>(
236 &mut self,
237 packet: PacketGeneric<'a, P, W, S>,
238 ) -> Result<ClientStateReceiveEvent<'a, 'b, P>, ClientStateError>;
239
240 fn subscribe<'b>(
242 &mut self,
243 topic_name: &'b str,
244 maximum_qos: QualityOfService,
245 ) -> Result<Subscribe<'b, 0, 0>, ClientStateError>;
246
247 fn unsubscribe<'b>(
249 &mut self,
250 topic_name: &'b str,
251 ) -> Result<Unsubscribe<'b, 0, 0>, ClientStateError>;
252
253 fn publish<'b>(
255 &mut self,
256 topic_name: &'b str,
257 payload: &'b [u8],
258 qos: QualityOfService,
259 retain: bool,
260 ) -> Result<Publish<'b, 0>, ClientStateError> {
261 self.publish_with_properties(topic_name, payload, qos, retain, Vec::new())
262 }
263
264 fn publish_with_properties<'b, const P: usize>(
266 &mut self,
267 topic_name: &'b str,
268 payload: &'b [u8],
269 qos: QualityOfService,
270 retain: bool,
271 properties: Vec<PublishProperty<'b>, P>,
272 ) -> Result<Publish<'b, P>, ClientStateError>;
273
274 fn error(&mut self);
278}
279
280#[derive(PartialEq)]
281pub enum ClientStateNoQueue {
282 Idle,
283 Connecting(RequestedConnectionInfo),
284 Connected(ConnectionState),
285 Errored,
286 Disconnected,
287}
288
289#[derive(PartialEq)]
290pub struct RequestedConnectionInfo {
291 clean_start: bool,
292 keep_alive: u16,
293}
294
295#[derive(PartialEq)]
296pub struct ConnectionInfo {
297 pending_ping_count: u32,
298 session_present: bool,
299 keep_alive: u16,
300}
301
302#[derive(PartialEq)]
303pub struct ConnectionState {
304 info: ConnectionInfo,
305 waiting: Waiting,
306}
307
308#[derive(PartialEq)]
309enum Waiting {
310 None,
311 ForPuback {
312 id: PacketIdentifier,
313 },
314 ForSuback {
315 id: PacketIdentifier,
316 qos: QualityOfService,
317 },
318 ForUnsuback {
319 id: PacketIdentifier,
320 },
321}
322
323impl Waiting {
324 fn is_waiting(&self) -> bool {
325 match self {
326 Self::None => false,
327 Self::ForPuback { id: _ } => true,
328 Self::ForSuback { id: _, qos: _ } => true,
329 Self::ForUnsuback { id: _ } => true,
330 }
331 }
332}
333
334impl ClientStateNoQueue {
335 const PUBLISH_PACKET_IDENTIFIER: PacketIdentifier = PacketIdentifier(1);
336 const SUBSCRIBE_PACKET_IDENTIFIER: PacketIdentifier = PacketIdentifier(2);
337 const UNSUBSCRIBE_PACKET_IDENTIFIER: PacketIdentifier = PacketIdentifier(3);
338
339 pub fn new() -> Self {
340 Self::Idle
341 }
342}
343
344impl Default for ClientStateNoQueue {
345 fn default() -> Self {
346 Self::new()
347 }
348}
349
350impl ClientState for ClientStateNoQueue {
351 fn waiting_for_responses(&self) -> bool {
352 match self {
353 Self::Idle => false,
354 Self::Connecting(_) => true,
355 Self::Connected(connection_data) => connection_data.waiting.is_waiting(),
356 Self::Errored => false,
357 Self::Disconnected => false,
358 }
359 }
360
361 fn connect<const P: usize, const W: usize>(
362 &mut self,
363 connect: &Connect<'_, P, W>,
364 ) -> Result<(), ClientStateError> {
365 match self {
366 ClientStateNoQueue::Idle => {
367 *self = Self::Connecting(RequestedConnectionInfo {
368 clean_start: connect.clean_start(),
369 keep_alive: connect.keep_alive(),
370 });
371 Ok(())
372 }
373 _ => Err(ClientStateError::NotIdle),
374 }
375 }
376
377 fn disconnect<'b>(&mut self) -> Result<Disconnect<'b, 0>, ClientStateError> {
378 match self {
379 ClientStateNoQueue::Connected(_d) => {
380 *self = Self::Disconnected;
381 Ok(Disconnect::default())
382 }
383 _ => Err(ClientStateError::NotConnected),
384 }
385 }
386
387 fn publish_with_properties<'b, const P: usize>(
388 &mut self,
389 topic_name: &'b str,
390 payload: &'b [u8],
391 qos: QualityOfService,
392 retain: bool,
393 properties: Vec<PublishProperty<'b>, P>,
394 ) -> Result<Publish<'b, P>, ClientStateError> {
395 match self {
396 ClientStateNoQueue::Connected(ConnectionState { info: _, waiting }) => {
397 let publish_packet_identifier = match qos {
398 QualityOfService::Qos0 => Ok(PublishPacketIdentifier::None),
399 QualityOfService::Qos1 if waiting.is_waiting() => {
400 Err(ClientStateError::ClientIsWaitingForResponse)
401 }
402 QualityOfService::Qos1 => Ok(PublishPacketIdentifier::Qos1(
403 Self::PUBLISH_PACKET_IDENTIFIER,
404 )),
405 QualityOfService::Qos2 => Err(ClientStateError::Qos2NotSupported),
406 }?;
407
408 let publish = Publish::new(
409 false,
410 retain,
411 topic_name,
412 publish_packet_identifier,
413 payload,
414 properties,
415 );
416
417 if qos == QualityOfService::Qos1 {
418 *waiting = Waiting::ForPuback {
419 id: Self::PUBLISH_PACKET_IDENTIFIER,
420 };
421 }
422
423 Ok(publish)
424 }
425 _ => Err(ClientStateError::NotConnected),
426 }
427 }
428
429 fn subscribe<'b>(
430 &mut self,
431 topic_name: &'b str,
432 maximum_qos: QualityOfService,
433 ) -> Result<Subscribe<'b, 0, 0>, ClientStateError> {
434 match self {
435 ClientStateNoQueue::Connected(ConnectionState { info: _, waiting }) => {
436 if waiting.is_waiting() {
437 Err(ClientStateError::ClientIsWaitingForResponse)
438 } else if maximum_qos == QualityOfService::Qos2 {
439 Err(ClientStateError::Qos2NotSupported)
440 } else {
441 let first_request = SubscriptionRequest::new(topic_name, maximum_qos);
442 let subscribe: Subscribe<'_, 0, 0> = Subscribe::new(
443 Self::SUBSCRIBE_PACKET_IDENTIFIER,
444 first_request,
445 Vec::new(),
446 Vec::new(),
447 );
448
449 *waiting = Waiting::ForSuback {
450 id: Self::SUBSCRIBE_PACKET_IDENTIFIER,
451 qos: maximum_qos,
452 };
453
454 Ok(subscribe)
455 }
456 }
457 _ => Err(ClientStateError::NotConnected),
458 }
459 }
460
461 fn unsubscribe<'b>(
462 &mut self,
463 topic_name: &'b str,
464 ) -> Result<Unsubscribe<'b, 0, 0>, ClientStateError> {
465 match self {
466 ClientStateNoQueue::Connected(ConnectionState { info: _, waiting }) => {
467 if waiting.is_waiting() {
468 Err(ClientStateError::ClientIsWaitingForResponse)
469 } else {
470 let unsubscribe: Unsubscribe<'_, 0, 0> = Unsubscribe::new(
471 Self::UNSUBSCRIBE_PACKET_IDENTIFIER,
472 topic_name,
473 Vec::new(),
474 Vec::new(),
475 );
476
477 *waiting = Waiting::ForUnsuback {
478 id: Self::UNSUBSCRIBE_PACKET_IDENTIFIER,
479 };
480
481 Ok(unsubscribe)
482 }
483 }
484 _ => Err(ClientStateError::NotConnected),
485 }
486 }
487
488 fn send_ping(&mut self) -> Result<Pingreq, ClientStateError> {
489 match self {
490 ClientStateNoQueue::Connected(ConnectionState { info, waiting: _ }) => {
491 info.pending_ping_count += 1;
492 Ok(Pingreq::default())
493 }
494 _ => Err(ClientStateError::NotConnected),
495 }
496 }
497
498 fn receive<'a, 'b, const P: usize, const W: usize, const S: usize>(
499 &mut self,
500 packet: PacketGeneric<'a, P, W, S>,
501 ) -> Result<ClientStateReceiveEvent<'a, 'b, P>, ClientStateError> {
502 match self {
503 ClientStateNoQueue::Connecting(RequestedConnectionInfo {
507 clean_start,
508 keep_alive,
509 }) => match packet {
510 PacketGeneric::Connack(connack) => match connack.reason_code() {
511 ConnectReasonCode::Success => {
512 let session_present = connack.session_present();
513
514 if session_present && *clean_start {
516 return Err(ClientStateError::UnexpectedSessionPresentForCleanStart);
517 }
518
519 let mut actual_keep_alive = *keep_alive;
521 for p in connack.properties().iter() {
522 if let ConnackProperty::ServerKeepAlive(server_keep_alive) = p {
523 actual_keep_alive = server_keep_alive.value();
524 }
525 }
526
527 let info = ConnectionInfo {
528 pending_ping_count: 0,
529 session_present,
530 keep_alive: actual_keep_alive,
531 };
532
533 *self = Self::Connected(ConnectionState {
534 info,
535 waiting: Waiting::None,
536 });
537
538 Ok(ClientStateReceiveEvent::Ack)
539 }
540 reason_code => Err(ClientStateError::Connect(*reason_code)),
541 },
542 PacketGeneric::Auth(_) => Err(ClientStateError::AuthNotSupported),
543 _ => Err(ClientStateError::ReceivedPacketOtherThanConnackOrAuthWhenConnecting),
544 },
545
546 ClientStateNoQueue::Connected(ConnectionState { info, waiting }) => match packet {
548 PacketGeneric::Publish(publish) => match publish.publish_packet_identifier() {
549 PublishPacketIdentifier::None => {
550 Ok(ClientStateReceiveEvent::Publish { publish })
551 }
552 PublishPacketIdentifier::Qos1(packet_identifier) => {
553 let puback =
554 Puback::new(*packet_identifier, PublishReasonCode::Success, Vec::new());
555 Ok(ClientStateReceiveEvent::PublishAndPuback { publish, puback })
556 }
557 PublishPacketIdentifier::Qos2(_) => {
558 Err(ClientStateError::ReceivedQos2PublishNotSupported)
559 }
560 },
561
562 PacketGeneric::Puback(puback) => {
563 let ack_id = puback.packet_identifier();
564 match waiting {
565 Waiting::ForPuback { id } if id == ack_id => {
566 *waiting = Waiting::None;
567
568 let reason_code = puback.reason_code();
569 if reason_code.is_error() {
570 Err(ClientStateError::Publish(*reason_code))
571 } else if reason_code == &PublishReasonCode::NoMatchingSubscribers {
572 Ok(ClientStateReceiveEvent::PublishedMessageHadNoMatchingSubscribers)
573 } else {
574 Ok(ClientStateReceiveEvent::Ack)
575 }
576 }
577 Waiting::ForPuback { id: _ } => {
578 Err(ClientStateError::UnexpectedPubackPacketIdentifier)
579 }
580 _ => Err(ClientStateError::UnexpectedPuback),
581 }
582 }
583
584 PacketGeneric::Suback(suback) => {
585 let ack_id = suback.packet_identifier();
586
587 match waiting {
588 Waiting::ForSuback { id, qos } if id == ack_id => {
589 let maximum_qos = *qos;
590 *waiting = Waiting::None;
591
592 let reason_code = suback.first_reason_code();
593 let granted_qos = match reason_code {
594 SubscribeReasonCode::Success => QualityOfService::Qos0,
595 SubscribeReasonCode::GrantedQos1 => QualityOfService::Qos1,
596 SubscribeReasonCode::GrantedQos2 => QualityOfService::Qos2,
597 err => return Err(ClientStateError::Subscribe(*err)),
598 };
599
600 if granted_qos != maximum_qos {
601 Ok(
602 ClientStateReceiveEvent::SubscriptionGrantedBelowMaximumQos {
603 granted_qos,
604 maximum_qos,
605 },
606 )
607 } else {
608 Ok(ClientStateReceiveEvent::Ack)
609 }
610 }
611 Waiting::ForSuback { id: _, qos: _ } => {
612 Err(ClientStateError::UnexpectedSubackPacketIdentifier)
613 }
614 _ => Err(ClientStateError::UnexpectedSuback),
615 }
616 }
617 PacketGeneric::Unsuback(unsuback) => {
618 let ack_id = unsuback.packet_identifier();
619
620 match waiting {
621 Waiting::ForUnsuback { id } if id == ack_id => {
622 *waiting = Waiting::None;
623
624 let reason_code = unsuback.first_reason_code();
625 if reason_code.is_error() {
626 Err(ClientStateError::Unsubscribe(*reason_code))
627 } else if reason_code == &UnsubscribeReasonCode::NoSubscriptionExisted {
628 Ok(ClientStateReceiveEvent::NoSubscriptionExisted)
629 } else {
630 Ok(ClientStateReceiveEvent::Ack)
631 }
632 }
633 Waiting::ForUnsuback { id: _ } => {
634 Err(ClientStateError::UnexpectedUnsubackPacketIdentifier)
635 }
636 _ => Err(ClientStateError::UnexpectedUnsuback),
637 }
638 }
639 PacketGeneric::Pingresp(_pingresp) => {
640 if info.pending_ping_count > 0 {
641 info.pending_ping_count -= 1;
642 Ok(ClientStateReceiveEvent::Ack)
643 } else {
644 Err(ClientStateError::UnexpectedPingresp)
645 }
646 }
647 PacketGeneric::Disconnect(disconnect) => {
648 Ok(ClientStateReceiveEvent::Disconnect { disconnect })
649 }
650 PacketGeneric::Connack(_) => {
651 Err(ClientStateError::ReceivedConnackWhenNotConnecting)
652 }
653 PacketGeneric::Auth(_auth) => Err(ClientStateError::AuthNotSupported),
654 PacketGeneric::Connect(_)
655 | PacketGeneric::Pubrec(_)
656 | PacketGeneric::Pubrel(_)
657 | PacketGeneric::Pubcomp(_)
658 | PacketGeneric::Subscribe(_)
659 | PacketGeneric::Unsubscribe(_)
660 | PacketGeneric::Pingreq(_) => Err(ClientStateError::ServerOnlyMessageReceived),
661 },
662 _ => Err(ClientStateError::ReceiveWhenNotConnectedOrConnecting),
663 }
664 }
665
666 fn error(&mut self) {
667 *self = Self::Errored;
668 }
669}