1use core::{
2 fmt::{Display, Formatter},
3 str::Utf8Error,
4};
5
6use heapless::Vec;
7
8use crate::{
9 client_state::{ClientState, ClientStateError, ClientStateNoQueue, ClientStateReceiveEvent},
10 codec::write,
11 data::{
12 property::{ConnectProperty, PublishProperty},
13 quality_of_service::QualityOfService,
14 reason_code::DisconnectReasonCode,
15 },
16 error::{PacketReadError, PacketWriteError},
17 packet_client::{Connection, PacketClient},
18 packets::{
19 connect::{Connect, Will},
20 packet::{Packet, KEEP_ALIVE_DEFAULT},
21 packet_generic::PacketGeneric,
22 publish::{ApplicationMessage, Publish},
23 },
24};
25
26#[derive(Debug, PartialEq, Clone, Copy)]
35pub enum EventHandlerError {
36 Utf8(Utf8Error),
39
40 UnexpectedApplicationMessageTopic,
43
44 InvalidApplicationMessage,
49
50 UnexpectedApplicationMessage,
53
54 Overflow,
58
59 Closed,
63
64 SubscriptionGrantedBelowMaximumQos {
67 granted_qos: QualityOfService,
68 maximum_qos: QualityOfService,
69 },
70
71 PublishedMessageHadNoMatchingSubscribers,
74
75 NoSubscriptionExisted,
78}
79#[cfg(feature = "defmt")]
80impl defmt::Format for EventHandlerError {
81 fn format(&self, f: defmt::Formatter) {
82 match self {
83 Self::Utf8(_) => defmt::write!(f, "Utf8"),
84 Self::UnexpectedApplicationMessageTopic => {
85 defmt::write!(f, "UnexpectedApplicationMessageTopic")
86 }
87 Self::InvalidApplicationMessage => defmt::write!(f, "InvalidApplicationMessage"),
88 Self::UnexpectedApplicationMessage => defmt::write!(f, "UnexpectedApplicationMessage"),
89 Self::Overflow => defmt::write!(f, "Overflow"),
90 Self::SubscriptionGrantedBelowMaximumQos {
91 granted_qos,
92 maximum_qos,
93 } => defmt::write!(
94 f,
95 "SubscriptionGrantedBelowMaximumQos(granted_qos: {}, maximum_qos: {})",
96 granted_qos,
97 maximum_qos
98 ),
99 Self::PublishedMessageHadNoMatchingSubscribers => {
100 defmt::write!(f, "PublishedMessageHadNoMatchingSubscribers")
101 }
102 Self::NoSubscriptionExisted => defmt::write!(f, "NoSubscriptionExisted"),
103 Self::Closed => defmt::write!(f, "Closed"),
104 }
105 }
106}
107
108impl From<Utf8Error> for EventHandlerError {
109 fn from(value: Utf8Error) -> Self {
110 Self::Utf8(value)
111 }
112}
113
114impl Display for EventHandlerError {
115 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
116 match self {
117 Self::Utf8(e) => write!(f, "Utf8({})", e),
118 Self::UnexpectedApplicationMessageTopic => {
119 write!(f, "UnexpectedApplicationMessageTopic")
120 }
121 Self::InvalidApplicationMessage => write!(f, "InvalidApplicationMessage"),
122 Self::UnexpectedApplicationMessage => write!(f, "UnexpectedApplicationMessage"),
123 Self::Overflow => write!(f, "Overflow"),
124 Self::SubscriptionGrantedBelowMaximumQos {
125 granted_qos,
126 maximum_qos,
127 } => write!(
128 f,
129 "SubscriptionGrantedBelowMaximumQos(granted_qos: {}, maximum_qos: {})",
130 granted_qos, maximum_qos
131 ),
132 Self::PublishedMessageHadNoMatchingSubscribers => {
133 write!(f, "PublishedMessageHadNoMatchingSubscribers")
134 }
135 Self::NoSubscriptionExisted => write!(f, "NoSubscriptionExisted"),
136 Self::Closed => write!(f, "Closed"),
137 }
138 }
139}
140
141#[derive(Debug, PartialEq, Clone, Copy)]
143pub enum ClientError {
144 PacketWrite(PacketWriteError),
145 PacketRead(PacketReadError),
146 ClientState(ClientStateError),
147 TimeoutOnResponsePacket,
148 Disconnected(DisconnectReasonCode),
149 EventHandler(EventHandlerError),
150 EmptyTopicNameWithAliasesDisabled,
155}
156
157#[cfg(feature = "defmt")]
158impl defmt::Format for ClientError {
159 fn format(&self, f: defmt::Formatter) {
160 match self {
161 Self::PacketWrite(e) => defmt::write!(f, "PacketWrite({})", e),
162 Self::PacketRead(e) => defmt::write!(f, "PacketRead({})", e),
163 Self::ClientState(e) => defmt::write!(f, "ClientState({})", e),
164 Self::TimeoutOnResponsePacket => defmt::write!(f, "TimeoutOnResponsePacket"),
165 Self::Disconnected(r) => defmt::write!(f, "Disconnected({})", r),
166 Self::EventHandler(e) => defmt::write!(f, "EventHandler({})", e),
167 Self::EmptyTopicNameWithAliasesDisabled => {
168 defmt::write!(f, "EmptyTopicNameWithAliasesDisabled")
169 }
170 }
171 }
172}
173
174impl From<ClientStateError> for ClientError {
175 fn from(value: ClientStateError) -> Self {
176 ClientError::ClientState(value)
177 }
178}
179
180impl From<PacketWriteError> for ClientError {
181 fn from(value: PacketWriteError) -> Self {
182 ClientError::PacketWrite(value)
183 }
184}
185
186impl From<PacketReadError> for ClientError {
187 fn from(value: PacketReadError) -> Self {
188 ClientError::PacketRead(value)
189 }
190}
191
192impl From<EventHandlerError> for ClientError {
193 fn from(value: EventHandlerError) -> Self {
194 ClientError::EventHandler(value)
195 }
196}
197
198impl Display for ClientError {
199 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
200 match self {
201 Self::PacketWrite(e) => write!(f, "PacketWrite({})", e),
202 Self::PacketRead(e) => write!(f, "PacketRead({})", e),
203 Self::ClientState(e) => write!(f, "ClientState({})", e),
204 Self::TimeoutOnResponsePacket => write!(f, "TimeoutOnResponsePacket"),
205 Self::Disconnected(e) => write!(f, "Disconnected({})", e),
206 Self::EventHandler(e) => write!(f, "EventHandler({})", e),
207 Self::EmptyTopicNameWithAliasesDisabled => write!(f, "EmptyTopicWithAliasesDisabled"),
208 }
209 }
210}
211
212#[allow(async_fn_in_trait)]
214pub trait Client<'a> {
215 async fn connect(&mut self, settings: &ConnectionSettings) -> Result<(), ClientError>;
217
218 async fn connect_with_will<const W: usize>(
220 &mut self,
221 settings: &ConnectionSettings,
222 will: Option<Will<'_, W>>,
223 ) -> Result<(), ClientError>;
224
225 async fn disconnect(&mut self) -> Result<(), ClientError>;
227
228 async fn send_ping(&mut self) -> Result<(), ClientError>;
230
231 async fn poll(&mut self, wait: bool) -> Result<bool, ClientError>;
243
244 async fn subscribe<'b>(
246 &'b mut self,
247 topic_name: &'b str,
248 maximum_qos: QualityOfService,
249 ) -> Result<(), ClientError>;
250
251 async fn unsubscribe<'b>(&'b mut self, topic_name: &'b str) -> Result<(), ClientError>;
253
254 async fn publish<'b>(
256 &'b mut self,
257 topic_name: &'b str,
258 payload: &'b [u8],
259 qos: QualityOfService,
260 retain: bool,
261 ) -> Result<(), ClientError> {
262 self.publish_with_properties::<0>(topic_name, payload, qos, retain, Vec::new())
263 .await
264 }
265
266 async fn publish_with_properties<'b, const P: usize>(
268 &'b mut self,
269 topic_name: &'b str,
270 payload: &'b [u8],
271 qos: QualityOfService,
272 retain: bool,
273 properties: Vec<PublishProperty<'b>, P>,
274 ) -> Result<(), ClientError>;
275
276 async fn perform<'b, const P: usize>(
280 &'b mut self,
281 action: ClientAction<'b, P>,
282 ) -> Result<(), ClientError>;
283}
284
285pub enum ClientAction<'a, const P: usize> {
286 Subscribe {
287 topic_name: &'a str,
288 maximum_qos: QualityOfService,
289 },
290 Unsubscribe {
291 topic_name: &'a str,
292 },
293 Publish {
294 topic_name: &'a str,
295 payload: &'a [u8],
296 qos: QualityOfService,
297 retain: bool,
298 },
299 PublishWithProperties {
300 topic_name: &'a str,
301 payload: &'a [u8],
302 qos: QualityOfService,
303 retain: bool,
304 properties: Vec<PublishProperty<'a>, P>,
305 },
306}
307
308#[allow(async_fn_in_trait)]
309pub trait Delay {
310 async fn delay_us(&mut self, us: u32);
313}
314
315pub struct ConnectionSettings<'a> {
316 keep_alive: u16,
317 username: Option<&'a str>,
318 password: Option<&'a [u8]>,
319 client_id: &'a str,
320}
321
322impl<'a> ConnectionSettings<'a> {
323 pub fn unauthenticated(client_id: &'a str) -> ConnectionSettings<'a> {
324 Self {
325 keep_alive: KEEP_ALIVE_DEFAULT,
326 username: None,
327 password: None,
328 client_id,
329 }
330 }
331 pub fn client_id(&self) -> &'a str {
332 self.client_id
333 }
334}
335
336#[derive(Debug, PartialEq)]
337pub enum ClientReceivedEvent<'a, const P: usize> {
338 ApplicationMessage(ApplicationMessage<'a, P>),
340
341 Ack,
352
353 SubscriptionGrantedBelowMaximumQos {
361 granted_qos: QualityOfService,
362 maximum_qos: QualityOfService,
363 },
364
365 PublishedMessageHadNoMatchingSubscribers,
371
372 NoSubscriptionExisted,
379}
380
381impl<'a, const P: usize> From<Publish<'a, P>> for ClientReceivedEvent<'a, P> {
382 fn from(value: Publish<'a, P>) -> Self {
383 Self::ApplicationMessage(value.into())
384 }
385}
386
387#[allow(async_fn_in_trait)]
388pub trait EventHandler<const P: usize> {
389 async fn handle_event(
390 &mut self,
391 event: ClientReceivedEvent<P>,
392 ) -> Result<(), EventHandlerError>;
393}
394
395pub struct ClientNoQueue<'a, C, D, F, const P: usize>
396where
397 C: Connection,
398 D: Delay,
399 F: EventHandler<P>,
400{
401 packet_client: PacketClient<'a, C>,
402 client_state: ClientStateNoQueue,
403 delay: D,
404 timeout_millis: u32,
405 event_handler: F,
406}
407
408impl<'a, C, D, F, const P: usize> ClientNoQueue<'a, C, D, F, P>
409where
410 C: Connection,
411 D: Delay,
412 F: EventHandler<P>,
413{
414 pub fn new(
415 connection: C,
416 buf: &'a mut [u8],
417 delay: D,
418 timeout_millis: u32,
419 event_handler: F,
420 ) -> Self {
421 let packet_client = PacketClient::new(connection, buf);
422 let client_state = ClientStateNoQueue::default();
423 Self {
424 packet_client,
425 client_state,
426 delay,
427 timeout_millis,
428 event_handler,
429 }
430 }
431
432 async fn wait_for_responses(&mut self, timeout_millis: u32) -> Result<(), ClientError> {
433 let mut elapsed = 0;
434 let mut waiting = self.client_state.waiting_for_responses();
435 while waiting && elapsed <= timeout_millis {
436 self.poll(false).await?;
437 waiting = self.client_state.waiting_for_responses();
438 elapsed += 1;
439 self.delay.delay_us(1000).await;
440 }
441
442 if waiting {
443 Err(ClientError::TimeoutOnResponsePacket)
444 } else {
445 Ok(())
446 }
447 }
448
449 async fn send_wait_for_responses<PW>(&mut self, packet: PW) -> Result<(), ClientError>
450 where
451 PW: Packet + write::Write,
452 {
453 match self.packet_client.send(packet).await {
454 Ok(()) => {
455 self.wait_for_responses(self.timeout_millis).await?;
456 Ok(())
457 }
458 Err(e) => {
459 self.client_state.error();
460 Err(e.into())
461 }
462 }
463 }
464
465 async fn send<PW>(&mut self, packet: PW) -> Result<(), ClientError>
466 where
467 PW: Packet + write::Write,
468 {
469 let r = self.packet_client.send(packet).await;
470 if r.is_err() {
471 self.client_state.error();
472 }
473 r?;
474 Ok(())
475 }
476}
477
478impl<'a, C, D, F, const P: usize> Client<'a> for ClientNoQueue<'a, C, D, F, P>
479where
480 C: Connection,
481 D: Delay,
482 F: EventHandler<P>,
483{
484 async fn connect_with_will<const W: usize>(
485 &mut self,
486 settings: &ConnectionSettings<'_>,
487 will: Option<Will<'_, W>>,
488 ) -> Result<(), ClientError> {
489 let mut properties = Vec::new();
490 properties
496 .push(ConnectProperty::TopicAliasMaximum(0.into()))
497 .unwrap();
498 let packet: Connect<'_, 1, W> = Connect::new(
499 settings.keep_alive,
500 settings.username,
501 settings.password,
502 settings.client_id,
503 true,
504 will,
505 properties,
506 );
507 self.client_state.connect(&packet)?;
508 self.send_wait_for_responses(packet).await
509 }
510 async fn connect(&mut self, settings: &ConnectionSettings<'_>) -> Result<(), ClientError> {
511 self.connect_with_will::<0>(settings, None).await
512 }
513
514 async fn disconnect(&mut self) -> Result<(), ClientError> {
515 let packet = self.client_state.disconnect()?;
516 self.send(packet).await
517 }
518
519 async fn publish_with_properties<'b, const PP: usize>(
520 &'b mut self,
521 topic_name: &'b str,
522 payload: &'b [u8],
523 qos: QualityOfService,
524 retain: bool,
525 properties: Vec<PublishProperty<'b>, PP>,
526 ) -> Result<(), ClientError> {
527 let packet = self
528 .client_state
529 .publish_with_properties(topic_name, payload, qos, retain, properties)?;
530 self.send_wait_for_responses(packet).await
531 }
532
533 async fn subscribe<'b>(
534 &'b mut self,
535 topic_name: &'b str,
536 maximum_qos: QualityOfService,
537 ) -> Result<(), ClientError> {
538 let packet = self.client_state.subscribe(topic_name, maximum_qos)?;
539 self.send_wait_for_responses(packet).await
540 }
541
542 async fn unsubscribe<'b>(&'b mut self, topic_name: &'b str) -> Result<(), ClientError> {
543 let packet = self.client_state.unsubscribe(topic_name)?;
544 self.send_wait_for_responses(packet).await
545 }
546
547 async fn send_ping(&mut self) -> Result<(), ClientError> {
548 let packet = self.client_state.send_ping()?;
549 self.send(packet).await
550 }
551
552 async fn poll(&mut self, wait: bool) -> Result<bool, ClientError> {
553 let to_send = {
561 let packet: Option<PacketGeneric<'_, P, 0, 0>> = if wait {
562 Some(self.packet_client.receive().await?)
563 } else {
564 self.packet_client.receive_if_ready().await?
565 };
566
567 if let Some(packet) = packet {
568 let event = self.client_state.receive(packet)?;
569
570 match event {
571 ClientStateReceiveEvent::Ack => {
572 self.event_handler
573 .handle_event(ClientReceivedEvent::Ack)
574 .await?;
575 None
576 }
577
578 ClientStateReceiveEvent::Publish { publish } => {
579 if publish.topic_name().is_empty() {
580 return Err(ClientError::EmptyTopicNameWithAliasesDisabled);
581 }
582 self.event_handler.handle_event(publish.into()).await?;
583 None
584 }
585
586 ClientStateReceiveEvent::PublishAndPuback { publish, puback } => {
587 if publish.topic_name().is_empty() {
588 return Err(ClientError::EmptyTopicNameWithAliasesDisabled);
589 }
590 self.event_handler.handle_event(publish.into()).await?;
591 Some(puback)
592 }
593
594 ClientStateReceiveEvent::SubscriptionGrantedBelowMaximumQos {
595 granted_qos,
596 maximum_qos,
597 } => {
598 self.event_handler
599 .handle_event(ClientReceivedEvent::SubscriptionGrantedBelowMaximumQos {
600 granted_qos,
601 maximum_qos,
602 })
603 .await?;
604 None
605 }
606
607 ClientStateReceiveEvent::PublishedMessageHadNoMatchingSubscribers => {
608 self.event_handler
609 .handle_event(
610 ClientReceivedEvent::PublishedMessageHadNoMatchingSubscribers,
611 )
612 .await?;
613 None
614 }
615
616 ClientStateReceiveEvent::NoSubscriptionExisted => {
617 self.event_handler
618 .handle_event(ClientReceivedEvent::NoSubscriptionExisted)
619 .await?;
620 None
621 }
622
623 ClientStateReceiveEvent::Disconnect { disconnect } => {
624 return Err(ClientError::Disconnected(*disconnect.reason_code()));
625 }
626 }
627 } else {
628 return Ok(false);
629 }
630 };
631
632 if let Some(packet) = to_send {
634 self.send(packet).await?;
635 }
636
637 Ok(true)
638 }
639
640 async fn perform<'b, const PP: usize>(
641 &'b mut self,
642 action: ClientAction<'b, PP>,
643 ) -> Result<(), ClientError> {
644 match action {
645 ClientAction::Subscribe {
646 topic_name,
647 maximum_qos,
648 } => self.subscribe(topic_name, maximum_qos).await,
649 ClientAction::Unsubscribe { topic_name } => self.unsubscribe(topic_name).await,
650 ClientAction::Publish {
651 topic_name,
652 payload,
653 qos,
654 retain,
655 } => self.publish(topic_name, payload, qos, retain).await,
656 ClientAction::PublishWithProperties {
657 topic_name,
658 payload,
659 qos,
660 retain,
661 properties,
662 } => {
663 self.publish_with_properties(topic_name, payload, qos, retain, properties)
664 .await
665 }
666 }
667 }
668}