Skip to main content

mountain_mqtt/
client.rs

1use core::{
2    fmt::{Display, Formatter},
3    str::Utf8Error,
4};
5
6use heapless::Vec;
7
8use crate::{
9    client_state::{ClientState, ClientStateError, ClientStateNoQueue, ClientStateReceiveEvent},
10    codec::write,
11    data::{
12        property::{ConnectProperty, PublishProperty},
13        quality_of_service::QualityOfService,
14        reason_code::DisconnectReasonCode,
15    },
16    error::{PacketReadError, PacketWriteError},
17    packet_client::{Connection, PacketClient},
18    packets::{
19        connect::{Connect, Will},
20        packet::{Packet, KEEP_ALIVE_DEFAULT},
21        packet_generic::PacketGeneric,
22        publish::{ApplicationMessage, Publish},
23    },
24};
25
26/// Errors produced when a [ClientNoQueue] event handler cannot handle
27/// a [ClientReceivedEvent]. These errors propagate to the user of the
28/// client, and so are likely to cause the client to be disconnected.
29/// Alternatively, an event handler can use another method to propagate
30/// an error if it does not wish for the client to be disconnected, for
31/// example logging a warning, or using another target for the error,
32/// for example if an event handler uses a channel to send valid events
33/// onwards, it could also have a channel for errors.
34#[derive(Debug, PartialEq, Clone, Copy)]
35pub enum EventHandlerError {
36    /// Application Message payload contained invalid utf8 data, when
37    /// a utf8 string was expected
38    Utf8(Utf8Error),
39
40    /// The topic of an application message is not expected, for example
41    /// it doesn't match our expected subscriptions
42    UnexpectedApplicationMessageTopic,
43
44    /// The contents of an application message are invalid for the handler,
45    /// and can't be parsed.
46    /// For example if a json string is expected, and invalid json is received,
47    /// or if the json received does not match the expected schema.
48    InvalidApplicationMessage,
49
50    /// The contents of an application message can be parsed, but are unexpected,
51    /// e.g. if they are received out of sequence
52    UnexpectedApplicationMessage,
53
54    /// A valid, expected event was received, but could not be
55    /// handled due to an overflow. For example if the messages are sent onwards
56    /// to a channel, and that channel is at capacity.
57    Overflow,
58
59    /// A valid, expected event was received, but could not be
60    /// handled due to the destination for events being closed. For example if the messages are sent onwards
61    /// to a channel, and that channel is closed.
62    Closed,
63
64    /// The corresponding [ClientReceivedEvent] was received, and is an error
65    /// for this event handler
66    SubscriptionGrantedBelowMaximumQos {
67        granted_qos: QualityOfService,
68        maximum_qos: QualityOfService,
69    },
70
71    /// The corresponding [ClientReceivedEvent] was received, and is an error
72    /// for this event handler
73    PublishedMessageHadNoMatchingSubscribers,
74
75    /// The corresponding [ClientReceivedEvent] was received, and is an error
76    /// for this event handler
77    NoSubscriptionExisted,
78}
79#[cfg(feature = "defmt")]
80impl defmt::Format for EventHandlerError {
81    fn format(&self, f: defmt::Formatter) {
82        match self {
83            Self::Utf8(_) => defmt::write!(f, "Utf8"),
84            Self::UnexpectedApplicationMessageTopic => {
85                defmt::write!(f, "UnexpectedApplicationMessageTopic")
86            }
87            Self::InvalidApplicationMessage => defmt::write!(f, "InvalidApplicationMessage"),
88            Self::UnexpectedApplicationMessage => defmt::write!(f, "UnexpectedApplicationMessage"),
89            Self::Overflow => defmt::write!(f, "Overflow"),
90            Self::SubscriptionGrantedBelowMaximumQos {
91                granted_qos,
92                maximum_qos,
93            } => defmt::write!(
94                f,
95                "SubscriptionGrantedBelowMaximumQos(granted_qos: {}, maximum_qos: {})",
96                granted_qos,
97                maximum_qos
98            ),
99            Self::PublishedMessageHadNoMatchingSubscribers => {
100                defmt::write!(f, "PublishedMessageHadNoMatchingSubscribers")
101            }
102            Self::NoSubscriptionExisted => defmt::write!(f, "NoSubscriptionExisted"),
103            Self::Closed => defmt::write!(f, "Closed"),
104        }
105    }
106}
107
108impl From<Utf8Error> for EventHandlerError {
109    fn from(value: Utf8Error) -> Self {
110        Self::Utf8(value)
111    }
112}
113
114impl Display for EventHandlerError {
115    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
116        match self {
117            Self::Utf8(e) => write!(f, "Utf8({})", e),
118            Self::UnexpectedApplicationMessageTopic => {
119                write!(f, "UnexpectedApplicationMessageTopic")
120            }
121            Self::InvalidApplicationMessage => write!(f, "InvalidApplicationMessage"),
122            Self::UnexpectedApplicationMessage => write!(f, "UnexpectedApplicationMessage"),
123            Self::Overflow => write!(f, "Overflow"),
124            Self::SubscriptionGrantedBelowMaximumQos {
125                granted_qos,
126                maximum_qos,
127            } => write!(
128                f,
129                "SubscriptionGrantedBelowMaximumQos(granted_qos: {}, maximum_qos: {})",
130                granted_qos, maximum_qos
131            ),
132            Self::PublishedMessageHadNoMatchingSubscribers => {
133                write!(f, "PublishedMessageHadNoMatchingSubscribers")
134            }
135            Self::NoSubscriptionExisted => write!(f, "NoSubscriptionExisted"),
136            Self::Closed => write!(f, "Closed"),
137        }
138    }
139}
140
141/// [Client] error
142#[derive(Debug, PartialEq, Clone, Copy)]
143pub enum ClientError {
144    PacketWrite(PacketWriteError),
145    PacketRead(PacketReadError),
146    ClientState(ClientStateError),
147    TimeoutOnResponsePacket,
148    Disconnected(DisconnectReasonCode),
149    EventHandler(EventHandlerError),
150    /// Client received an empty topic name when it has disabled topic aliases
151    /// This indicates a server error, client should disconnect, it may send
152    /// a Disconnect with [DisconnectReasonCode::TopicAliasInvalid], on the assumption
153    /// that the packet also had some topic alias specified.
154    EmptyTopicNameWithAliasesDisabled,
155}
156
157#[cfg(feature = "defmt")]
158impl defmt::Format for ClientError {
159    fn format(&self, f: defmt::Formatter) {
160        match self {
161            Self::PacketWrite(e) => defmt::write!(f, "PacketWrite({})", e),
162            Self::PacketRead(e) => defmt::write!(f, "PacketRead({})", e),
163            Self::ClientState(e) => defmt::write!(f, "ClientState({})", e),
164            Self::TimeoutOnResponsePacket => defmt::write!(f, "TimeoutOnResponsePacket"),
165            Self::Disconnected(r) => defmt::write!(f, "Disconnected({})", r),
166            Self::EventHandler(e) => defmt::write!(f, "EventHandler({})", e),
167            Self::EmptyTopicNameWithAliasesDisabled => {
168                defmt::write!(f, "EmptyTopicNameWithAliasesDisabled")
169            }
170        }
171    }
172}
173
174impl From<ClientStateError> for ClientError {
175    fn from(value: ClientStateError) -> Self {
176        ClientError::ClientState(value)
177    }
178}
179
180impl From<PacketWriteError> for ClientError {
181    fn from(value: PacketWriteError) -> Self {
182        ClientError::PacketWrite(value)
183    }
184}
185
186impl From<PacketReadError> for ClientError {
187    fn from(value: PacketReadError) -> Self {
188        ClientError::PacketRead(value)
189    }
190}
191
192impl From<EventHandlerError> for ClientError {
193    fn from(value: EventHandlerError) -> Self {
194        ClientError::EventHandler(value)
195    }
196}
197
198impl Display for ClientError {
199    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
200        match self {
201            Self::PacketWrite(e) => write!(f, "PacketWrite({})", e),
202            Self::PacketRead(e) => write!(f, "PacketRead({})", e),
203            Self::ClientState(e) => write!(f, "ClientState({})", e),
204            Self::TimeoutOnResponsePacket => write!(f, "TimeoutOnResponsePacket"),
205            Self::Disconnected(e) => write!(f, "Disconnected({})", e),
206            Self::EventHandler(e) => write!(f, "EventHandler({})", e),
207            Self::EmptyTopicNameWithAliasesDisabled => write!(f, "EmptyTopicWithAliasesDisabled"),
208        }
209    }
210}
211
212/// A simple client interface for connecting to an MQTT server
213#[allow(async_fn_in_trait)]
214pub trait Client<'a> {
215    /// Connect to server
216    async fn connect(&mut self, settings: &ConnectionSettings) -> Result<(), ClientError>;
217
218    /// Connect to server with a will
219    async fn connect_with_will<const W: usize>(
220        &mut self,
221        settings: &ConnectionSettings,
222        will: Option<Will<'_, W>>,
223    ) -> Result<(), ClientError>;
224
225    /// Disconnect from server
226    async fn disconnect(&mut self) -> Result<(), ClientError>;
227
228    /// Send a ping message to broker
229    async fn send_ping(&mut self) -> Result<(), ClientError>;
230
231    /// Poll for and handle at most one event
232    /// This updates the state of the client, and calls the event_handler if
233    /// a message is received
234    /// If wait is true, this will wait until an event is received, or the comms
235    /// are disconnected. Otherwise an event will only be waited for if at least one
236    /// byte of data is already available, indicating a packet should be available
237    /// soon.
238    /// On success, returns true if an event was handled, false if none was received
239    /// Errors indicate an invalid packet was received, message_target errored,
240    /// the received packet was unexpected based on our state, or the comms are
241    /// disconnected.
242    async fn poll(&mut self, wait: bool) -> Result<bool, ClientError>;
243
244    /// Subscribe to a topic
245    async fn subscribe<'b>(
246        &'b mut self,
247        topic_name: &'b str,
248        maximum_qos: QualityOfService,
249    ) -> Result<(), ClientError>;
250
251    /// Unsubscribe from a topic
252    async fn unsubscribe<'b>(&'b mut self, topic_name: &'b str) -> Result<(), ClientError>;
253
254    /// Publish a message with given payload to a given topic, with no properties
255    async fn publish<'b>(
256        &'b mut self,
257        topic_name: &'b str,
258        payload: &'b [u8],
259        qos: QualityOfService,
260        retain: bool,
261    ) -> Result<(), ClientError> {
262        self.publish_with_properties::<0>(topic_name, payload, qos, retain, Vec::new())
263            .await
264    }
265
266    /// Publish a message with given payload to a given topic, with properties
267    async fn publish_with_properties<'b, const P: usize>(
268        &'b mut self,
269        topic_name: &'b str,
270        payload: &'b [u8],
271        qos: QualityOfService,
272        retain: bool,
273        properties: Vec<PublishProperty<'b>, P>,
274    ) -> Result<(), ClientError>;
275
276    /// Perform an action (this replicates the functionality of
277    /// [Client::subscribe], [Client::unsubscribe] and [Client::publish])
278    /// but using an enum to represent the action.
279    async fn perform<'b, const P: usize>(
280        &'b mut self,
281        action: ClientAction<'b, P>,
282    ) -> Result<(), ClientError>;
283}
284
285pub enum ClientAction<'a, const P: usize> {
286    Subscribe {
287        topic_name: &'a str,
288        maximum_qos: QualityOfService,
289    },
290    Unsubscribe {
291        topic_name: &'a str,
292    },
293    Publish {
294        topic_name: &'a str,
295        payload: &'a [u8],
296        qos: QualityOfService,
297        retain: bool,
298    },
299    PublishWithProperties {
300        topic_name: &'a str,
301        payload: &'a [u8],
302        qos: QualityOfService,
303        retain: bool,
304        properties: Vec<PublishProperty<'a>, P>,
305    },
306}
307
308#[allow(async_fn_in_trait)]
309pub trait Delay {
310    /// Pauses execution for at minimum `us` microseconds. Pause can be longer
311    /// if the implementation requires it due to precision/timing issues.
312    async fn delay_us(&mut self, us: u32);
313}
314
315pub struct ConnectionSettings<'a> {
316    keep_alive: u16,
317    username: Option<&'a str>,
318    password: Option<&'a [u8]>,
319    client_id: &'a str,
320}
321
322impl<'a> ConnectionSettings<'a> {
323    pub fn unauthenticated(client_id: &'a str) -> ConnectionSettings<'a> {
324        Self {
325            keep_alive: KEEP_ALIVE_DEFAULT,
326            username: None,
327            password: None,
328            client_id,
329        }
330    }
331    pub fn client_id(&self) -> &'a str {
332        self.client_id
333    }
334}
335
336#[derive(Debug, PartialEq)]
337pub enum ClientReceivedEvent<'a, const P: usize> {
338    /// Client received an application message published to a subscribed topic
339    ApplicationMessage(ApplicationMessage<'a, P>),
340
341    /// Client received an acknowledgement/response for a previous message sent
342    /// to the server (e.g. Connack, Puback, Suback, Unsuback, Pingresp)
343    /// This can be used to track whether the client is still connected to the
344    /// server - in particular, there will be an Ack event per ping response.
345    /// An approach is to call [Client::send_ping] at least every T seconds,
346    /// and consider the client to be still connected if there has been an
347    /// Ack within the last T+N seconds, for some multiple N depending on
348    /// how long a latency/interruption can be tolerated. To tolerate loss
349    /// of ping request/response packets, N should be increased so that T+N
350    /// is a multiple of T.
351    Ack,
352
353    /// A subscription was granted but was at lower qos than the maximum requested
354    /// This may or may not require action depending on client requirements -
355    /// it means that the given subscription will receive published messages at
356    /// only the granted qos - if the requested maximum qos was absolutely required
357    /// then the client could respond by showing an error to the user stating the
358    /// server is incompatible, or possibly trying to unsubscribe and resubscribe,
359    /// assuming this is expected to make any difference with the server(s) in use.
360    SubscriptionGrantedBelowMaximumQos {
361        granted_qos: QualityOfService,
362        maximum_qos: QualityOfService,
363    },
364
365    /// A published message was received at the server, but had no matching subscribers and
366    /// so did not reach any receivers
367    /// This may or may not require action depending on client requirements / expectations
368    /// E.g. if it was expected there would be subscribers, the client could try resending
369    /// the message later
370    PublishedMessageHadNoMatchingSubscribers,
371
372    // Server processed an unsubscribe request, but no such subscription existed on the server,
373    // so nothing changed.
374    /// This may or may not require action depending on client requirements / expectations
375    /// E.g. if it was expected there would be a subscription, the client could produce
376    /// an error, and the user of the client might try reconnecting to the server to set
377    /// up subscriptions again.
378    NoSubscriptionExisted,
379}
380
381impl<'a, const P: usize> From<Publish<'a, P>> for ClientReceivedEvent<'a, P> {
382    fn from(value: Publish<'a, P>) -> Self {
383        Self::ApplicationMessage(value.into())
384    }
385}
386
387#[allow(async_fn_in_trait)]
388pub trait EventHandler<const P: usize> {
389    async fn handle_event(
390        &mut self,
391        event: ClientReceivedEvent<P>,
392    ) -> Result<(), EventHandlerError>;
393}
394
395pub struct ClientNoQueue<'a, C, D, F, const P: usize>
396where
397    C: Connection,
398    D: Delay,
399    F: EventHandler<P>,
400{
401    packet_client: PacketClient<'a, C>,
402    client_state: ClientStateNoQueue,
403    delay: D,
404    timeout_millis: u32,
405    event_handler: F,
406}
407
408impl<'a, C, D, F, const P: usize> ClientNoQueue<'a, C, D, F, P>
409where
410    C: Connection,
411    D: Delay,
412    F: EventHandler<P>,
413{
414    pub fn new(
415        connection: C,
416        buf: &'a mut [u8],
417        delay: D,
418        timeout_millis: u32,
419        event_handler: F,
420    ) -> Self {
421        let packet_client = PacketClient::new(connection, buf);
422        let client_state = ClientStateNoQueue::default();
423        Self {
424            packet_client,
425            client_state,
426            delay,
427            timeout_millis,
428            event_handler,
429        }
430    }
431
432    async fn wait_for_responses(&mut self, timeout_millis: u32) -> Result<(), ClientError> {
433        let mut elapsed = 0;
434        let mut waiting = self.client_state.waiting_for_responses();
435        while waiting && elapsed <= timeout_millis {
436            self.poll(false).await?;
437            waiting = self.client_state.waiting_for_responses();
438            elapsed += 1;
439            self.delay.delay_us(1000).await;
440        }
441
442        if waiting {
443            Err(ClientError::TimeoutOnResponsePacket)
444        } else {
445            Ok(())
446        }
447    }
448
449    async fn send_wait_for_responses<PW>(&mut self, packet: PW) -> Result<(), ClientError>
450    where
451        PW: Packet + write::Write,
452    {
453        match self.packet_client.send(packet).await {
454            Ok(()) => {
455                self.wait_for_responses(self.timeout_millis).await?;
456                Ok(())
457            }
458            Err(e) => {
459                self.client_state.error();
460                Err(e.into())
461            }
462        }
463    }
464
465    async fn send<PW>(&mut self, packet: PW) -> Result<(), ClientError>
466    where
467        PW: Packet + write::Write,
468    {
469        let r = self.packet_client.send(packet).await;
470        if r.is_err() {
471            self.client_state.error();
472        }
473        r?;
474        Ok(())
475    }
476}
477
478impl<'a, C, D, F, const P: usize> Client<'a> for ClientNoQueue<'a, C, D, F, P>
479where
480    C: Connection,
481    D: Delay,
482    F: EventHandler<P>,
483{
484    async fn connect_with_will<const W: usize>(
485        &mut self,
486        settings: &ConnectionSettings<'_>,
487        will: Option<Will<'_, W>>,
488    ) -> Result<(), ClientError> {
489        let mut properties = Vec::new();
490        // By setting maximum topic alias to 0, we prevent the server
491        // trying to use aliases, which we don't support. They are optional
492        // and only provide for reduced packet size, but would require storing
493        // topic names from the server for the length of the connection,
494        // which might be awkward without alloc.
495        properties
496            .push(ConnectProperty::TopicAliasMaximum(0.into()))
497            .unwrap();
498        let packet: Connect<'_, 1, W> = Connect::new(
499            settings.keep_alive,
500            settings.username,
501            settings.password,
502            settings.client_id,
503            true,
504            will,
505            properties,
506        );
507        self.client_state.connect(&packet)?;
508        self.send_wait_for_responses(packet).await
509    }
510    async fn connect(&mut self, settings: &ConnectionSettings<'_>) -> Result<(), ClientError> {
511        self.connect_with_will::<0>(settings, None).await
512    }
513
514    async fn disconnect(&mut self) -> Result<(), ClientError> {
515        let packet = self.client_state.disconnect()?;
516        self.send(packet).await
517    }
518
519    async fn publish_with_properties<'b, const PP: usize>(
520        &'b mut self,
521        topic_name: &'b str,
522        payload: &'b [u8],
523        qos: QualityOfService,
524        retain: bool,
525        properties: Vec<PublishProperty<'b>, PP>,
526    ) -> Result<(), ClientError> {
527        let packet = self
528            .client_state
529            .publish_with_properties(topic_name, payload, qos, retain, properties)?;
530        self.send_wait_for_responses(packet).await
531    }
532
533    async fn subscribe<'b>(
534        &'b mut self,
535        topic_name: &'b str,
536        maximum_qos: QualityOfService,
537    ) -> Result<(), ClientError> {
538        let packet = self.client_state.subscribe(topic_name, maximum_qos)?;
539        self.send_wait_for_responses(packet).await
540    }
541
542    async fn unsubscribe<'b>(&'b mut self, topic_name: &'b str) -> Result<(), ClientError> {
543        let packet = self.client_state.unsubscribe(topic_name)?;
544        self.send_wait_for_responses(packet).await
545    }
546
547    async fn send_ping(&mut self) -> Result<(), ClientError> {
548        let packet = self.client_state.send_ping()?;
549        self.send(packet).await
550    }
551
552    async fn poll(&mut self, wait: bool) -> Result<bool, ClientError> {
553        // We need to wrap up like this so we can drop the mutable reference to
554        // self.packet_client needed to receive data - this reference needs to live
555        // as long as the returned data from the client, so we need to drop everything
556        // but the packet we need to send, in order to be able to mutably borrow
557        // packet_client again to actually do the send.
558        // Note we allow 0 will properties and additional subscriptions, since we
559        // shouldn't receive any messages using these, since we are a client.
560        let to_send = {
561            let packet: Option<PacketGeneric<'_, P, 0, 0>> = if wait {
562                Some(self.packet_client.receive().await?)
563            } else {
564                self.packet_client.receive_if_ready().await?
565            };
566
567            if let Some(packet) = packet {
568                let event = self.client_state.receive(packet)?;
569
570                match event {
571                    ClientStateReceiveEvent::Ack => {
572                        self.event_handler
573                            .handle_event(ClientReceivedEvent::Ack)
574                            .await?;
575                        None
576                    }
577
578                    ClientStateReceiveEvent::Publish { publish } => {
579                        if publish.topic_name().is_empty() {
580                            return Err(ClientError::EmptyTopicNameWithAliasesDisabled);
581                        }
582                        self.event_handler.handle_event(publish.into()).await?;
583                        None
584                    }
585
586                    ClientStateReceiveEvent::PublishAndPuback { publish, puback } => {
587                        if publish.topic_name().is_empty() {
588                            return Err(ClientError::EmptyTopicNameWithAliasesDisabled);
589                        }
590                        self.event_handler.handle_event(publish.into()).await?;
591                        Some(puback)
592                    }
593
594                    ClientStateReceiveEvent::SubscriptionGrantedBelowMaximumQos {
595                        granted_qos,
596                        maximum_qos,
597                    } => {
598                        self.event_handler
599                            .handle_event(ClientReceivedEvent::SubscriptionGrantedBelowMaximumQos {
600                                granted_qos,
601                                maximum_qos,
602                            })
603                            .await?;
604                        None
605                    }
606
607                    ClientStateReceiveEvent::PublishedMessageHadNoMatchingSubscribers => {
608                        self.event_handler
609                            .handle_event(
610                                ClientReceivedEvent::PublishedMessageHadNoMatchingSubscribers,
611                            )
612                            .await?;
613                        None
614                    }
615
616                    ClientStateReceiveEvent::NoSubscriptionExisted => {
617                        self.event_handler
618                            .handle_event(ClientReceivedEvent::NoSubscriptionExisted)
619                            .await?;
620                        None
621                    }
622
623                    ClientStateReceiveEvent::Disconnect { disconnect } => {
624                        return Err(ClientError::Disconnected(*disconnect.reason_code()));
625                    }
626                }
627            } else {
628                return Ok(false);
629            }
630        };
631
632        // Send any resulting packet, no need to wait for responses
633        if let Some(packet) = to_send {
634            self.send(packet).await?;
635        }
636
637        Ok(true)
638    }
639
640    async fn perform<'b, const PP: usize>(
641        &'b mut self,
642        action: ClientAction<'b, PP>,
643    ) -> Result<(), ClientError> {
644        match action {
645            ClientAction::Subscribe {
646                topic_name,
647                maximum_qos,
648            } => self.subscribe(topic_name, maximum_qos).await,
649            ClientAction::Unsubscribe { topic_name } => self.unsubscribe(topic_name).await,
650            ClientAction::Publish {
651                topic_name,
652                payload,
653                qos,
654                retain,
655            } => self.publish(topic_name, payload, qos, retain).await,
656            ClientAction::PublishWithProperties {
657                topic_name,
658                payload,
659                qos,
660                retain,
661                properties,
662            } => {
663                self.publish_with_properties(topic_name, payload, qos, retain, properties)
664                    .await
665            }
666        }
667    }
668}