rust_mqtt/client/
client.rs

1/*
2 * MIT License
3 *
4 * Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24
25use embedded_io_async::{Read, Write};
26use heapless::Vec;
27use rand_core::RngCore;
28
29use crate::client::client_config::ClientConfig;
30use crate::packet::v5::publish_packet::QualityOfService::{self, QoS1};
31use crate::packet::v5::reason_codes::ReasonCode;
32
33use super::raw_client::{Event, RawMqttClient};
34
35pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore>
36where
37    T: Read + Write,
38{
39    raw: RawMqttClient<'a, T, MAX_PROPERTIES, R>,
40}
41
42impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
43where
44    T: Read + Write,
45    R: RngCore,
46{
47    pub fn new(
48        network_driver: T,
49        buffer: &'a mut [u8],
50        buffer_len: usize,
51        recv_buffer: &'a mut [u8],
52        recv_buffer_len: usize,
53        config: ClientConfig<'a, MAX_PROPERTIES, R>,
54    ) -> Self {
55        Self {
56            raw: RawMqttClient::new(
57                network_driver,
58                buffer,
59                buffer_len,
60                recv_buffer,
61                recv_buffer_len,
62                config,
63            ),
64        }
65    }
66
67    /// Method allows client connect to server. Client is connecting to the specified broker
68    /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
69    /// If the connection to the broker fails, method returns Err variable that contains
70    /// Reason codes returned from the broker.
71    pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
72        self.raw.connect_to_broker().await?;
73
74        match self.raw.poll::<0>().await? {
75            Event::Connack => Ok(()),
76            Event::Disconnect(reason) => Err(reason),
77            // If an application message comes at this moment, it is lost.
78            _ => Err(ReasonCode::ImplementationSpecificError),
79        }
80    }
81
82    /// Method allows client disconnect from the server. Client disconnects from the specified broker
83    /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
84    /// If the disconnect from the broker fails, method returns Err variable that contains
85    /// Reason codes returned from the broker.
86    pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
87        self.raw.disconnect().await?;
88        Ok(())
89    }
90
91    /// Method allows sending message to broker specified from the ClientConfig. Client sends the
92    /// message from the parameter `message` to the topic `topic_name` on the broker
93    /// specified in the ClientConfig. If the send fails method returns Err with reason code
94    /// received by broker.
95    pub async fn send_message<'b>(
96        &'b mut self,
97        topic_name: &'b str,
98        message: &'b [u8],
99        qos: QualityOfService,
100        retain: bool,
101    ) -> Result<(), ReasonCode> {
102        let identifier = self
103            .raw
104            .send_message(topic_name, message, qos, retain)
105            .await?;
106
107        // QoS1
108        if qos == QoS1 {
109            match self.raw.poll::<0>().await? {
110                Event::Puback(ack_identifier) => {
111                    if identifier == ack_identifier {
112                        Ok(())
113                    } else {
114                        Err(ReasonCode::PacketIdentifierNotFound)
115                    }
116                }
117                Event::Disconnect(reason) => Err(reason),
118                // If an application message comes at this moment, it is lost.
119                _ => Err(ReasonCode::ImplementationSpecificError),
120            }
121        } else {
122            Ok(())
123        }
124    }
125
126    /// Method allows client subscribe to multiple topics specified in the parameter
127    /// `topic_names` on the broker specified in the `ClientConfig`. Generics `TOPICS`
128    /// sets the value of the `topics_names` vector. MQTT protocol implementation
129    /// is selected automatically.
130    pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
131        &'b mut self,
132        topic_names: &'b Vec<&'b str, TOPICS>,
133    ) -> Result<(), ReasonCode> {
134        let identifier = self.raw.subscribe_to_topics(topic_names).await?;
135
136        match self.raw.poll::<TOPICS>().await? {
137            Event::Suback(ack_identifier) => {
138                if identifier == ack_identifier {
139                    Ok(())
140                } else {
141                    Err(ReasonCode::PacketIdentifierNotFound)
142                }
143            }
144            Event::Disconnect(reason) => Err(reason),
145            // If an application message comes at this moment, it is lost.
146            _ => Err(ReasonCode::ImplementationSpecificError),
147        }
148    }
149
150    /// Method allows client unsubscribe from the topic specified in the parameter
151    /// `topic_name` on the broker from the `ClientConfig`. MQTT protocol implementation
152    /// is selected automatically.
153    pub async fn unsubscribe_from_topic<'b>(
154        &'b mut self,
155        topic_name: &'b str,
156    ) -> Result<(), ReasonCode> {
157        let identifier = self.raw.unsubscribe_from_topic(topic_name).await?;
158
159        match self.raw.poll::<0>().await? {
160            Event::Unsuback(ack_identifier) => {
161                if identifier == ack_identifier {
162                    Ok(())
163                } else {
164                    Err(ReasonCode::PacketIdentifierNotFound)
165                }
166            }
167            Event::Disconnect(reason) => Err(reason),
168            // If an application message comes at this moment, it is lost.
169            _ => Err(ReasonCode::ImplementationSpecificError),
170        }
171    }
172
173    /// Method allows client subscribe to multiple topics specified in the parameter
174    /// `topic_name` on the broker specified in the `ClientConfig`. MQTT protocol implementation
175    /// is selected automatically.
176    pub async fn subscribe_to_topic<'b>(
177        &'b mut self,
178        topic_name: &'b str,
179    ) -> Result<(), ReasonCode> {
180        let mut topic_names = Vec::<&'b str, 1>::new();
181        topic_names.push(topic_name).unwrap();
182
183        let identifier = self.raw.subscribe_to_topics(&topic_names).await?;
184
185        match self.raw.poll::<1>().await? {
186            Event::Suback(ack_identifier) => {
187                if identifier == ack_identifier {
188                    Ok(())
189                } else {
190                    Err(ReasonCode::PacketIdentifierNotFound)
191                }
192            }
193            Event::Disconnect(reason) => Err(reason),
194            // If an application message comes at this moment, it is lost.
195            _ => Err(ReasonCode::ImplementationSpecificError),
196        }
197    }
198
199    /// Method allows client receive a message. The work of this method strictly depends on the
200    /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet
201    /// from the broker.
202    pub async fn receive_message<'b>(&'b mut self) -> Result<(&'b str, &'b [u8]), ReasonCode> {
203        match self.raw.poll::<0>().await? {
204            Event::Message(topic, payload) => Ok((topic, payload)),
205            Event::Disconnect(reason) => Err(reason),
206            // If an application message comes at this moment, it is lost.
207            _ => Err(ReasonCode::ImplementationSpecificError),
208        }
209    }
210
211    /// Method allows client send PING message to the broker specified in the `ClientConfig`.
212    /// If there is expectation for long running connection. Method should be executed
213    /// regularly by the timer that counts down the session expiry interval.
214    pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
215        self.raw.send_ping().await?;
216
217        match self.raw.poll::<0>().await? {
218            Event::Pingresp => Ok(()),
219            Event::Disconnect(reason) => Err(reason),
220            // If an application message comes at this moment, it is lost.
221            _ => Err(ReasonCode::ImplementationSpecificError),
222        }
223    }
224}