rust_mqtt/client/
client.rs

1/*
2 * MIT License
3 *
4 * Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24
25use embedded_io::ReadReady;
26use embedded_io_async::{Read, Write};
27use heapless::Vec;
28use rand_core::RngCore;
29
30use crate::client::client_config::ClientConfig;
31use crate::packet::v5::publish_packet::QualityOfService::{self, QoS1};
32use crate::packet::v5::reason_codes::ReasonCode;
33
34use super::raw_client::{Event, RawMqttClient};
35
36pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore>
37where
38    T: Read + Write,
39{
40    raw: RawMqttClient<'a, T, MAX_PROPERTIES, R>,
41}
42
43impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
44where
45    T: Read + Write,
46    R: RngCore,
47{
48    pub fn new(
49        network_driver: T,
50        buffer: &'a mut [u8],
51        buffer_len: usize,
52        recv_buffer: &'a mut [u8],
53        recv_buffer_len: usize,
54        config: ClientConfig<'a, MAX_PROPERTIES, R>,
55    ) -> Self {
56        Self {
57            raw: RawMqttClient::new(
58                network_driver,
59                buffer,
60                buffer_len,
61                recv_buffer,
62                recv_buffer_len,
63                config,
64            ),
65        }
66    }
67
68    /// Method allows client connect to server. Client is connecting to the specified broker
69    /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
70    /// If the connection to the broker fails, method returns Err variable that contains
71    /// Reason codes returned from the broker.
72    pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
73        self.raw.connect_to_broker().await?;
74
75        match self.raw.poll::<0>().await? {
76            Event::Connack => Ok(()),
77            Event::Disconnect(reason) => Err(reason),
78            // If an application message comes at this moment, it is lost.
79            _ => Err(ReasonCode::ImplementationSpecificError),
80        }
81    }
82
83    /// Method allows client disconnect from the server. Client disconnects from the specified broker
84    /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
85    /// If the disconnect from the broker fails, method returns Err variable that contains
86    /// Reason codes returned from the broker.
87    pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
88        self.raw.disconnect().await?;
89        Ok(())
90    }
91
92    /// Method allows sending message to broker specified from the ClientConfig. Client sends the
93    /// message from the parameter `message` to the topic `topic_name` on the broker
94    /// specified in the ClientConfig. If the send fails method returns Err with reason code
95    /// received by broker.
96    pub async fn send_message<'b>(
97        &'b mut self,
98        topic_name: &'b str,
99        message: &'b [u8],
100        qos: QualityOfService,
101        retain: bool,
102    ) -> Result<(), ReasonCode> {
103        let identifier = self
104            .raw
105            .send_message(topic_name, message, qos, retain)
106            .await?;
107
108        // QoS1
109        if qos == QoS1 {
110            match self.raw.poll::<0>().await? {
111                Event::Puback(ack_identifier, matching_subscriber) => {
112                    if !matching_subscriber {
113                        Err(ReasonCode::NoMatchingSubscribers)
114                    } else if identifier == ack_identifier {
115                        Ok(())
116                    } else {
117                        Err(ReasonCode::PacketIdentifierNotFound)
118                    }
119                }
120                Event::Disconnect(reason) => Err(reason),
121                // If an application message comes at this moment, it is lost.
122                _ => Err(ReasonCode::ImplementationSpecificError),
123            }
124        } else {
125            Ok(())
126        }
127    }
128
129    /// Method allows client subscribe to multiple topics specified in the parameter
130    /// `topic_names` on the broker specified in the `ClientConfig`. Generics `TOPICS`
131    /// sets the value of the `topics_names` vector. MQTT protocol implementation
132    /// is selected automatically.
133    pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
134        &'b mut self,
135        topic_names: &'b Vec<&'b str, TOPICS>,
136    ) -> Result<(), ReasonCode> {
137        let identifier = self.raw.subscribe_to_topics(topic_names).await?;
138
139        match self.raw.poll::<TOPICS>().await? {
140            Event::Suback(ack_identifier) => {
141                if identifier == ack_identifier {
142                    Ok(())
143                } else {
144                    Err(ReasonCode::PacketIdentifierNotFound)
145                }
146            }
147            Event::Disconnect(reason) => Err(reason),
148            // If an application message comes at this moment, it is lost.
149            _ => Err(ReasonCode::ImplementationSpecificError),
150        }
151    }
152
153    /// Method allows client unsubscribe from the topic specified in the parameter
154    /// `topic_name` on the broker from the `ClientConfig`. MQTT protocol implementation
155    /// is selected automatically.
156    pub async fn unsubscribe_from_topic<'b>(
157        &'b mut self,
158        topic_name: &'b str,
159    ) -> Result<(), ReasonCode> {
160        let identifier = self.raw.unsubscribe_from_topic(topic_name).await?;
161
162        match self.raw.poll::<0>().await? {
163            Event::Unsuback(ack_identifier) => {
164                if identifier == ack_identifier {
165                    Ok(())
166                } else {
167                    Err(ReasonCode::PacketIdentifierNotFound)
168                }
169            }
170            Event::Disconnect(reason) => Err(reason),
171            // If an application message comes at this moment, it is lost.
172            _ => Err(ReasonCode::ImplementationSpecificError),
173        }
174    }
175
176    /// Method allows client subscribe to multiple topics specified in the parameter
177    /// `topic_name` on the broker specified in the `ClientConfig`. MQTT protocol implementation
178    /// is selected automatically.
179    pub async fn subscribe_to_topic<'b>(
180        &'b mut self,
181        topic_name: &'b str,
182    ) -> Result<(), ReasonCode> {
183        let mut topic_names = Vec::<&'b str, 1>::new();
184        topic_names.push(topic_name).unwrap();
185
186        let identifier = self.raw.subscribe_to_topics(&topic_names).await?;
187
188        match self.raw.poll::<1>().await? {
189            Event::Suback(ack_identifier) => {
190                if identifier == ack_identifier {
191                    Ok(())
192                } else {
193                    Err(ReasonCode::PacketIdentifierNotFound)
194                }
195            }
196            Event::Disconnect(reason) => Err(reason),
197            // If an application message comes at this moment, it is lost.
198            _ => Err(ReasonCode::ImplementationSpecificError),
199        }
200    }
201
202    /// Method allows client receive a message. The work of this method strictly depends on the
203    /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet
204    /// from the broker.
205    pub async fn receive_message<'b>(&'b mut self) -> Result<(&'b str, &'b [u8]), ReasonCode> {
206        match self.raw.poll::<0>().await? {
207            Event::Message(topic, payload) => Ok((topic, payload)),
208            Event::Disconnect(reason) => Err(reason),
209            // If an application message comes at this moment, it is lost.
210            _ => Err(ReasonCode::ImplementationSpecificError),
211        }
212    }
213
214    /// Method allows client send PING message to the broker specified in the `ClientConfig`.
215    /// If there is expectation for long running connection. Method should be executed
216    /// regularly by the timer that counts down the session expiry interval.
217    pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
218        self.raw.send_ping().await?;
219
220        match self.raw.poll::<0>().await? {
221            Event::Pingresp => Ok(()),
222            Event::Disconnect(reason) => Err(reason),
223            // If an application message comes at this moment, it is lost.
224            _ => Err(ReasonCode::ImplementationSpecificError),
225        }
226    }
227}
228
229impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
230where
231    T: Read + Write + ReadReady,
232    R: RngCore,
233{
234    /// Receive a message if one is ready. The work of this method strictly depends on the
235    /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet
236    /// from the broker.
237    pub async fn receive_message_if_ready<'b>(
238        &'b mut self,
239    ) -> Result<Option<(&'b str, &'b [u8])>, ReasonCode> {
240        match self.raw.poll_if_ready::<0>().await? {
241            None => Ok(None),
242            Some(Event::Message(topic, payload)) => Ok(Some((topic, payload))),
243            Some(Event::Disconnect(reason)) => Err(reason),
244            // If an application message comes at this moment, it is lost.
245            _ => Err(ReasonCode::ImplementationSpecificError),
246        }
247    }
248}